提问人:AlwaysLearning 提问时间:11/14/2023 最后编辑:AlwaysLearning 更新时间:11/14/2023 访问量:118
通过从 S3 进行异步读取使网络带宽饱和
Saturating network bandwidth with asynchronous reads from S3
问:
我有一个 EC2 实例在与 S3 存储桶相同的区域中运行,其中包含许多对象,每个对象 ~40K。每个对象都存储在两个前缀中,因此可以从其中任何一个前缀中读取它。我可以在 1 秒多一点的时间内按顺序获取大约 70 个对象(在与 S3 建立连接后开始测量)。假设 EC2 实例的网络带宽高达 12500 MB,则理论最大速率约为每秒 30000 个对象。我想使带宽饱和并尽可能接近此速率。到目前为止,使用异步 I/O,我每秒最多只能处理 500 个对象,这与可用带宽相去甚远。这是我的代码:c6i.large
import aioboto3
import aiobotocore
import asyncio
import time
TASK_LENGTH = 70
N_PREFIXES = 2
def ms_now():
return int(time.time_ns() / 1000000)
class Timer():
def __init__(self, timestamp_function=ms_now):
self.timestamp_function = timestamp_function
self.start = self.timestamp_function()
def stop(self):
return self.timestamp_function() - self.start
class S3:
def __init__(self, client, bucket_name):
self.client = client
self.bucket_name = bucket_name
async def read(self, key):
prefix = f"prefix-{random.randint(0, N_PREFIXES - 1)}/"
obj = (await self.client.get_object(Bucket=self.bucket_name, Key=prefix+key))['Body']
return await obj.read()
async def task(s3, keys):
for key in keys:
await s3.read(key)
async def main():
keys = [f"{content_id}" for content_id in range(500)]
session = aioboto3.session.Session()
config = aiobotocore.config.AioConfig(max_pool_connections=1000)
async with session.client('s3', region_name="eu-west-2", config=config) as client:
s3 = S3(client, "mybucket")
timer = Timer()
tasks = []
for mykeys in [keys[i:i + TASK_LENGTH] for i in range(0, len(keys), TASK_LENGTH)]:
tasks.append(asyncio.create_task(task(s3, mykeys)))
for t in tasks:
await t
print(f"{timer.stop()}ms")
asyncio.run(main())
此代码在一秒多一点的时间内最多可获取 500 个对象。无论我使用单个前缀还是两个前缀都无关紧要,这意味着瓶颈不是 S3。瓶颈是什么,如何突破?
更新 1.对于每秒 500 个对象,人们可能会期望至少需要 20 秒才能获得 10,000 个对象。有趣的是,它只需要 13 秒......同样,使用单个前缀还是两个前缀没有区别。
更新 2.我尝试使用更大的实例(其 vCPU 数量比原始实例多两倍,内存多两倍)并获得了类似的性能。c6i.xlarge
更新 3.基于这篇文章,我运行了这个(10,000 个对象)并进行了分析(在更大的实例中):
timeout -s INT -k 100s 105s python -m cProfile -s tottime script.py > out
以下是部分输出:
ncalls tottime percall cumtime percall filename:lineno(function)
547/1 0.002 0.000 24.656 24.656 {built-in method builtins.exec}
1 0.000 0.000 24.656 24.656 script.py:1(<module>)
1 0.000 0.000 24.384 24.384 runners.py:8(run)
3 0.000 0.000 24.384 8.128 base_events.py:613(run_until_complete)
3 0.001 0.000 24.384 8.128 base_events.py:589(run_forever)
605 0.072 0.000 24.383 0.040 base_events.py:1832(_run_once)
42253 0.040 0.000 24.228 0.001 events.py:78(_run)
42253 0.060 0.000 24.188 0.001 {method 'run' of '_contextvars.Context' objects}
20467 0.069 0.000 21.951 0.001 script.py:31(task)
30324 0.108 0.000 21.878 0.001 script.py:26(read)
30295 0.187 0.000 21.437 0.001 client.py:308(_make_api_call)
30295 0.059 0.000 17.352 0.001 client.py:387(_make_request)
30295 0.079 0.000 17.278 0.001 endpoint.py:92(_send_request)
30295 0.093 0.000 10.785 0.000 endpoint.py:133(_get_response)
30295 0.142 0.000 10.266 0.000 endpoint.py:161(_do_get_response)
110002/90002 0.440 0.000 5.928 0.000 hooks.py:47(_emit)
30295 0.032 0.000 5.641 0.000 endpoint.py:284(_send)
30310 0.268 0.000 5.612 0.000 httpsession.py:173(send)
10000 0.055 0.000 5.392 0.001 endpoint.py:71(create_request)
320004/140004 0.157 0.000 5.105 0.000 _helpers.py:13(resolve_awaitable)
30310 0.269 0.000 4.221 0.000 client.py:390(_request)
10000 0.014 0.000 4.161 0.000 signers.py:19(handler)
10000 0.078 0.000 4.147 0.000 signers.py:26(sign)
10000 0.048 0.000 3.759 0.000 parsers.py:223(parse)
10000 0.013 0.000 3.508 0.000 parsers.py:878(_do_parse)
10000 0.016 0.000 3.438 0.000 parsers.py:886(_add_modeled_parse)
10000 0.387 0.000 3.387 0.000 parsers.py:938(_parse_non_payload_attrs)
10000 0.072 0.000 3.289 0.000 auth.py:416(add_auth)
60000 0.060 0.000 2.605 0.000 parsers.py:328(_parse_shape)
60000 0.046 0.000 2.519 0.000 parsers.py:168(_get_text_content)
10000 0.011 0.000 2.371 0.000 parsers.py:535(_handle_timestamp)
10000 0.014 0.000 2.360 0.000 utils.py:942(parse_timestamp)
10000 0.057 0.000 2.339 0.000 utils.py:923(_parse_timestamp_with_tzinfo)
10001 0.021 0.000 2.273 0.000 _parser.py:1270(parse)
10001 0.049 0.000 2.252 0.000 _parser.py:572(parse)
10001 0.278 0.000 1.930 0.000 _parser.py:666(_parse)
ncalls tottime percall cumtime percall filename:lineno(function)
547/1 0.002 0.000 24.656 24.656 {built-in method builtins.exec}
1 0.000 0.000 24.656 24.656 script.py:1(<module>)
1 0.000 0.000 24.384 24.384 runners.py:8(run)
3 0.000 0.000 24.384 8.128 base_events.py:613(run_until_complete)
3 0.001 0.000 24.384 8.128 base_events.py:589(run_forever)
605 0.072 0.000 24.383 0.040 base_events.py:1832(_run_once)
42253 0.040 0.000 24.228 0.001 events.py:78(_run)
42253 0.060 0.000 24.188 0.001 {method 'run' of '_contextvars.Context' objects}
20467 0.069 0.000 21.951 0.001 script.py:31(task)
30324 0.108 0.000 21.878 0.001 script.py:26(read)
30295 0.187 0.000 21.437 0.001 client.py:308(_make_api_call)
30295 0.059 0.000 17.352 0.001 client.py:387(_make_request)
30295 0.079 0.000 17.278 0.001 endpoint.py:92(_send_request)
30295 0.093 0.000 10.785 0.000 endpoint.py:133(_get_response)
30295 0.142 0.000 10.266 0.000 endpoint.py:161(_do_get_response)
110002/90002 0.440 0.000 5.928 0.000 hooks.py:47(_emit)
30295 0.032 0.000 5.641 0.000 endpoint.py:284(_send)
30310 0.268 0.000 5.612 0.000 httpsession.py:173(send)
10000 0.055 0.000 5.392 0.001 endpoint.py:71(create_request)
320004/140004 0.157 0.000 5.105 0.000 _helpers.py:13(resolve_awaitable)
30310 0.269 0.000 4.221 0.000 client.py:390(_request)
10000 0.014 0.000 4.161 0.000 signers.py:19(handler)
10000 0.078 0.000 4.147 0.000 signers.py:26(sign)
10000 0.048 0.000 3.759 0.000 parsers.py:223(parse)
10000 0.013 0.000 3.508 0.000 parsers.py:878(_do_parse)
10000 0.016 0.000 3.438 0.000 parsers.py:886(_add_modeled_parse)
10000 0.387 0.000 3.387 0.000 parsers.py:938(_parse_non_payload_attrs)
10000 0.072 0.000 3.289 0.000 auth.py:416(add_auth)
60000 0.060 0.000 2.605 0.000 parsers.py:328(_parse_shape)
60000 0.046 0.000 2.519 0.000 parsers.py:168(_get_text_content)
10000 0.011 0.000 2.371 0.000 parsers.py:535(_handle_timestamp)
10000 0.014 0.000 2.360 0.000 utils.py:942(parse_timestamp)
10000 0.057 0.000 2.339 0.000 utils.py:923(_parse_timestamp_with_tzinfo)
10001 0.021 0.000 2.273 0.000 _parser.py:1270(parse)
10001 0.049 0.000 2.252 0.000 _parser.py:572(parse)
10001 0.278 0.000 1.930 0.000 _parser.py:666(_parse)
更新 4.我尝试了建议的改进:
import io
class S3:
def __init__(self, client, bucket_name):
self.client = client
self.bucket_name = bucket_name
async def read(self, key):
prefix = f"vectors-{random.randint(0,N_PREFIXES - 1)}/"
myFile = io.BytesIO()
await self.client.download_fileobj(self.bucket_name, prefix+key, myFile)
下面是部分探查器输出:
ncalls tottime percall cumtime percall filename:lineno(function)
547/1 0.002 0.000 28.279 28.279 {built-in method builtins.exec}
1 0.000 0.000 28.279 28.279 script.py:1(<module>)
1 0.000 0.000 28.009 28.009 runners.py:8(run)
3 0.000 0.000 28.009 9.336 base_events.py:613(run_until_complete)
3 0.017 0.006 28.008 9.336 base_events.py:589(run_forever)
27251 0.244 0.000 27.991 0.001 base_events.py:1832(_run_once)
145713 0.104 0.000 27.045 0.000 events.py:78(_run)
145713 0.201 0.000 26.940 0.000 {method 'run' of '_contextvars.Context' objects}
120885 0.062 0.000 23.729 0.000 script.py:34(task)
130742 0.169 0.000 23.667 0.000 script.py:27(read)
130742 0.362 0.000 23.443 0.000 inject.py:78(download_fileobj)
30350 0.192 0.000 21.691 0.001 client.py:308(_make_api_call)
30350 0.060 0.000 17.506 0.001 client.py:387(_make_request)
30350 0.078 0.000 17.431 0.001 endpoint.py:92(_send_request)
30350 0.090 0.000 10.960 0.000 endpoint.py:133(_get_response)
30350 0.141 0.000 10.450 0.000 endpoint.py:161(_do_get_response)
110002/90002 0.455 0.000 5.968 0.000 hooks.py:47(_emit)
30350 0.033 0.000 5.887 0.000 endpoint.py:284(_send)
30365 0.265 0.000 5.857 0.000 httpsession.py:173(send)
10000 0.052 0.000 5.405 0.001 endpoint.py:71(create_request)
320004/140004 0.158 0.000 5.109 0.000 _helpers.py:13(resolve_awaitable)
30365 0.282 0.000 4.431 0.000 client.py:390(_request)
10000 0.014 0.000 4.182 0.000 signers.py:19(handler)
10000 0.074 0.000 4.168 0.000 signers.py:26(sign)
10000 0.045 0.000 3.732 0.000 parsers.py:223(parse)
10000 0.013 0.000 3.476 0.000 parsers.py:878(_do_parse)
10000 0.015 0.000 3.407 0.000 parsers.py:886(_add_modeled_parse)
10000 0.371 0.000 3.361 0.000 parsers.py:938(_parse_non_payload_attrs)
10000 0.070 0.000 3.296 0.000 auth.py:416(add_auth)
60000 0.059 0.000 2.600 0.000 parsers.py:328(_parse_shape)
60000 0.043 0.000 2.517 0.000 parsers.py:168(_get_text_content)
10000 0.010 0.000 2.374 0.000 parsers.py:535(_handle_timestamp)
10000 0.014 0.000 2.364 0.000 utils.py:942(parse_timestamp)
10000 0.058 0.000 2.343 0.000 utils.py:923(_parse_timestamp_with_tzinfo)
13205 0.015 0.000 2.319 0.000 selector_events.py:818(_read_ready)
13205 0.033 0.000 2.304 0.000 selector_events.py:858(_read_ready__data_received)
10001 0.024 0.000 2.279 0.000 _parser.py:1270(parse)
10001 0.047 0.000 2.255 0.000 _parser.py:572(parse)
13202 0.047 0.000 2.101 0.000 sslproto.py:524(data_received)
10001 0.279 0.000 1.939 0.000 _parser.py:666(_parse)
10000 0.044 0.000 1.675 0.000 client.py:405(_convert_to_request_dict)
10000 0.068 0.000 1.386 0.000 auth.py:359(canonical_request)
10000 0.035 0.000 1.341 0.000 client.py:458(_resolve_endpoint_ruleset)
55028 0.108 0.000 1.231 0.000 client_proto.py:190(data_received)
10000 0.103 0.000 1.159 0.000 regions.py:11(construct_endpoint)
20000 0.094 0.000 1.011 0.000 auth.py:231(headers_to_sign)
10000 0.116 0.000 0.980 0.000 regions.py:61(_get_provider_params)
110392 0.227 0.000 0.925 0.000 response.py:49(read)
10003 0.081 0.000 0.917 0.000 cookiejar.py:235(filter_cookies)
20003 0.011 0.000 0.898 0.000 awsrequest.py:479(prepare)
10003 0.126 0.000 0.889 0.000 client_reqrep.py:605(send)
20003 0.039 0.000 0.887 0.000 awsrequest.py:366(prepare)
10000 0.013 0.000 0.844 0.000 endpoint.py:148(prepare_request)
13476 0.119 0.000 0.838 0.000 sslproto.py:156(feed_ssldata)
10000 0.035 0.000 0.833 0.000 endpoint.py:252(_needs_retry)
20003 0.047 0.000 0.818 0.000 awsrequest.py:388(_prepare_headers)
10001 0.058 0.000 0.799 0.000 _parser.py:199(split)
20000 0.099 0.000 0.770 0.000 endpoint.py:23(convert_to_response_dict)
3190421/3170421 0.474 0.000 0.761 0.000 {built-in method builtins.isinstance}
10001 0.069 0.000 0.760 0.000 useragent.py:258(to_string)
110020 0.406 0.000 0.719 0.000 parse.py:444(urlsplit)
170013 0.057 0.000 0.710 0.000 _parser.py:189(__next__)
10000 0.024 0.000 0.708 0.000 validate.py:374(serialize_to_request)
30006 0.034 0.000 0.703 0.000 awsrequest.py:612(__init__)
110395 0.110 0.000 0.674 0.000 streams.py:347(read)
30006 0.131 0.000 0.669 0.000 _collections_abc.py:991(update)
420050 0.279 0.000 0.666 0.000 inspect.py:348(isawaitable)
170013 0.392 0.000 0.653 0.000 _parser.py:77(get_token)
10003 0.103 0.000 0.649 0.000 client_reqrep.py:244(__init__)
10000 0.035 0.000 0.637 0.000 auth.py:434(_inject_signature_to_request)
27251 0.069 0.000 0.619 0.000 selectors.py:452(select)
65031 0.071 0.000 0.617 0.000 client_proto.py:172(_reschedule_timeout)
10000 0.025 0.000 0.610 0.000 auth.py:479(_modify_request_before_signing)
答: 暂无答案
评论
cumtime
botocore
dateutil