通过从 S3 进行异步读取使网络带宽饱和

Saturating network bandwidth with asynchronous reads from S3

提问人:AlwaysLearning 提问时间:11/14/2023 最后编辑:AlwaysLearning 更新时间:11/14/2023 访问量:118

问:

我有一个 EC2 实例在与 S3 存储桶相同的区域中运行,其中包含许多对象,每个对象 ~40K。每个对象都存储在两个前缀中,因此可以从其中任何一个前缀中读取它。我可以在 1 秒多一点的时间内按顺序获取大约 70 个对象(在与 S3 建立连接后开始测量)。假设 EC2 实例的网络带宽高达 12500 MB,则理论最大速率约为每秒 30000 个对象。我想使带宽饱和并尽可能接近此速率。到目前为止,使用异步 I/O,我每秒最多只能处理 500 个对象,这与可用带宽相去甚远。这是我的代码:c6i.large

import aioboto3
import aiobotocore
import asyncio
import time

TASK_LENGTH = 70
N_PREFIXES = 2

def ms_now():
    return int(time.time_ns() / 1000000)

class Timer():
    def __init__(self, timestamp_function=ms_now):
        self.timestamp_function = timestamp_function
        self.start = self.timestamp_function()

    def stop(self):
        return self.timestamp_function() - self.start

class S3:
    def __init__(self, client, bucket_name):
        self.client = client
        self.bucket_name = bucket_name

    async def read(self, key):
        prefix = f"prefix-{random.randint(0, N_PREFIXES - 1)}/"
        obj = (await self.client.get_object(Bucket=self.bucket_name, Key=prefix+key))['Body']
        return await obj.read()

async def task(s3, keys):
    for key in keys:
        await s3.read(key)

async def main():
    keys = [f"{content_id}" for content_id in range(500)]
    session = aioboto3.session.Session()
    config = aiobotocore.config.AioConfig(max_pool_connections=1000)
    async with session.client('s3', region_name="eu-west-2", config=config) as client:
        s3 = S3(client, "mybucket")
        timer = Timer()
        tasks = []
        for mykeys in [keys[i:i + TASK_LENGTH] for i in range(0, len(keys), TASK_LENGTH)]:
            tasks.append(asyncio.create_task(task(s3, mykeys)))
        for t in tasks:
            await t
        print(f"{timer.stop()}ms")

asyncio.run(main())

此代码在一秒多一点的时间内最多可获取 500 个对象。无论我使用单个前缀还是两个前缀都无关紧要,这意味着瓶颈不是 S3。瓶颈是什么,如何突破?

更新 1.对于每秒 500 个对象,人们可能会期望至少需要 20 秒才能获得 10,000 个对象。有趣的是,它只需要 13 秒......同样,使用单个前缀还是两个前缀没有区别。

更新 2.我尝试使用更大的实例(其 vCPU 数量比原始实例多两倍,内存多两倍)并获得了类似的性能。c6i.xlarge

更新 3.基于这篇文章,我运行了这个(10,000 个对象)并进行了分析(在更大的实例中):

timeout -s INT -k 100s 105s python -m cProfile -s tottime script.py > out

以下是部分输出:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    547/1    0.002    0.000   24.656   24.656 {built-in method builtins.exec}
        1    0.000    0.000   24.656   24.656 script.py:1(<module>)
        1    0.000    0.000   24.384   24.384 runners.py:8(run)
        3    0.000    0.000   24.384    8.128 base_events.py:613(run_until_complete)
        3    0.001    0.000   24.384    8.128 base_events.py:589(run_forever)
      605    0.072    0.000   24.383    0.040 base_events.py:1832(_run_once)
    42253    0.040    0.000   24.228    0.001 events.py:78(_run)
    42253    0.060    0.000   24.188    0.001 {method 'run' of '_contextvars.Context' objects}
    20467    0.069    0.000   21.951    0.001 script.py:31(task)
    30324    0.108    0.000   21.878    0.001 script.py:26(read)
    30295    0.187    0.000   21.437    0.001 client.py:308(_make_api_call)
    30295    0.059    0.000   17.352    0.001 client.py:387(_make_request)
    30295    0.079    0.000   17.278    0.001 endpoint.py:92(_send_request)
    30295    0.093    0.000   10.785    0.000 endpoint.py:133(_get_response)
    30295    0.142    0.000   10.266    0.000 endpoint.py:161(_do_get_response)
110002/90002    0.440    0.000    5.928    0.000 hooks.py:47(_emit)
    30295    0.032    0.000    5.641    0.000 endpoint.py:284(_send)
    30310    0.268    0.000    5.612    0.000 httpsession.py:173(send)
    10000    0.055    0.000    5.392    0.001 endpoint.py:71(create_request)
320004/140004    0.157    0.000    5.105    0.000 _helpers.py:13(resolve_awaitable)
    30310    0.269    0.000    4.221    0.000 client.py:390(_request)
    10000    0.014    0.000    4.161    0.000 signers.py:19(handler)
    10000    0.078    0.000    4.147    0.000 signers.py:26(sign)
    10000    0.048    0.000    3.759    0.000 parsers.py:223(parse)
    10000    0.013    0.000    3.508    0.000 parsers.py:878(_do_parse)
    10000    0.016    0.000    3.438    0.000 parsers.py:886(_add_modeled_parse)
    10000    0.387    0.000    3.387    0.000 parsers.py:938(_parse_non_payload_attrs)
    10000    0.072    0.000    3.289    0.000 auth.py:416(add_auth)
    60000    0.060    0.000    2.605    0.000 parsers.py:328(_parse_shape)
    60000    0.046    0.000    2.519    0.000 parsers.py:168(_get_text_content)
    10000    0.011    0.000    2.371    0.000 parsers.py:535(_handle_timestamp)
    10000    0.014    0.000    2.360    0.000 utils.py:942(parse_timestamp)
    10000    0.057    0.000    2.339    0.000 utils.py:923(_parse_timestamp_with_tzinfo)
    10001    0.021    0.000    2.273    0.000 _parser.py:1270(parse)
    10001    0.049    0.000    2.252    0.000 _parser.py:572(parse)
    10001    0.278    0.000    1.930    0.000 _parser.py:666(_parse)
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    547/1    0.002    0.000   24.656   24.656 {built-in method builtins.exec}
        1    0.000    0.000   24.656   24.656 script.py:1(<module>)
        1    0.000    0.000   24.384   24.384 runners.py:8(run)
        3    0.000    0.000   24.384    8.128 base_events.py:613(run_until_complete)
        3    0.001    0.000   24.384    8.128 base_events.py:589(run_forever)
      605    0.072    0.000   24.383    0.040 base_events.py:1832(_run_once)
    42253    0.040    0.000   24.228    0.001 events.py:78(_run)
    42253    0.060    0.000   24.188    0.001 {method 'run' of '_contextvars.Context' objects}
    20467    0.069    0.000   21.951    0.001 script.py:31(task)
    30324    0.108    0.000   21.878    0.001 script.py:26(read)
    30295    0.187    0.000   21.437    0.001 client.py:308(_make_api_call)
    30295    0.059    0.000   17.352    0.001 client.py:387(_make_request)
    30295    0.079    0.000   17.278    0.001 endpoint.py:92(_send_request)
    30295    0.093    0.000   10.785    0.000 endpoint.py:133(_get_response)
    30295    0.142    0.000   10.266    0.000 endpoint.py:161(_do_get_response)
110002/90002    0.440    0.000    5.928    0.000 hooks.py:47(_emit)
    30295    0.032    0.000    5.641    0.000 endpoint.py:284(_send)
    30310    0.268    0.000    5.612    0.000 httpsession.py:173(send)
    10000    0.055    0.000    5.392    0.001 endpoint.py:71(create_request)
320004/140004    0.157    0.000    5.105    0.000 _helpers.py:13(resolve_awaitable)
    30310    0.269    0.000    4.221    0.000 client.py:390(_request)
    10000    0.014    0.000    4.161    0.000 signers.py:19(handler)
    10000    0.078    0.000    4.147    0.000 signers.py:26(sign)
    10000    0.048    0.000    3.759    0.000 parsers.py:223(parse)
    10000    0.013    0.000    3.508    0.000 parsers.py:878(_do_parse)
    10000    0.016    0.000    3.438    0.000 parsers.py:886(_add_modeled_parse)
    10000    0.387    0.000    3.387    0.000 parsers.py:938(_parse_non_payload_attrs)
    10000    0.072    0.000    3.289    0.000 auth.py:416(add_auth)
    60000    0.060    0.000    2.605    0.000 parsers.py:328(_parse_shape)
    60000    0.046    0.000    2.519    0.000 parsers.py:168(_get_text_content)
    10000    0.011    0.000    2.371    0.000 parsers.py:535(_handle_timestamp)
    10000    0.014    0.000    2.360    0.000 utils.py:942(parse_timestamp)
    10000    0.057    0.000    2.339    0.000 utils.py:923(_parse_timestamp_with_tzinfo)
    10001    0.021    0.000    2.273    0.000 _parser.py:1270(parse)
    10001    0.049    0.000    2.252    0.000 _parser.py:572(parse)
    10001    0.278    0.000    1.930    0.000 _parser.py:666(_parse)

更新 4.我尝试了建议的改进:

import io

class S3:
    def __init__(self, client, bucket_name):
        self.client = client
        self.bucket_name = bucket_name

    async def read(self, key):
        prefix = f"vectors-{random.randint(0,N_PREFIXES - 1)}/"
        myFile = io.BytesIO()
        await self.client.download_fileobj(self.bucket_name, prefix+key, myFile)

下面是部分探查器输出:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    547/1    0.002    0.000   28.279   28.279 {built-in method builtins.exec}
        1    0.000    0.000   28.279   28.279 script.py:1(<module>)
        1    0.000    0.000   28.009   28.009 runners.py:8(run)
        3    0.000    0.000   28.009    9.336 base_events.py:613(run_until_complete)
        3    0.017    0.006   28.008    9.336 base_events.py:589(run_forever)
    27251    0.244    0.000   27.991    0.001 base_events.py:1832(_run_once)
   145713    0.104    0.000   27.045    0.000 events.py:78(_run)
   145713    0.201    0.000   26.940    0.000 {method 'run' of '_contextvars.Context' objects}
   120885    0.062    0.000   23.729    0.000 script.py:34(task)
   130742    0.169    0.000   23.667    0.000 script.py:27(read)
   130742    0.362    0.000   23.443    0.000 inject.py:78(download_fileobj)
    30350    0.192    0.000   21.691    0.001 client.py:308(_make_api_call)
    30350    0.060    0.000   17.506    0.001 client.py:387(_make_request)
    30350    0.078    0.000   17.431    0.001 endpoint.py:92(_send_request)
    30350    0.090    0.000   10.960    0.000 endpoint.py:133(_get_response)
    30350    0.141    0.000   10.450    0.000 endpoint.py:161(_do_get_response)
110002/90002    0.455    0.000    5.968    0.000 hooks.py:47(_emit)
    30350    0.033    0.000    5.887    0.000 endpoint.py:284(_send)
    30365    0.265    0.000    5.857    0.000 httpsession.py:173(send)
    10000    0.052    0.000    5.405    0.001 endpoint.py:71(create_request)
320004/140004    0.158    0.000    5.109    0.000 _helpers.py:13(resolve_awaitable)
    30365    0.282    0.000    4.431    0.000 client.py:390(_request)
    10000    0.014    0.000    4.182    0.000 signers.py:19(handler)
    10000    0.074    0.000    4.168    0.000 signers.py:26(sign)
    10000    0.045    0.000    3.732    0.000 parsers.py:223(parse)
    10000    0.013    0.000    3.476    0.000 parsers.py:878(_do_parse)
    10000    0.015    0.000    3.407    0.000 parsers.py:886(_add_modeled_parse)
    10000    0.371    0.000    3.361    0.000 parsers.py:938(_parse_non_payload_attrs)
    10000    0.070    0.000    3.296    0.000 auth.py:416(add_auth)
    60000    0.059    0.000    2.600    0.000 parsers.py:328(_parse_shape)
    60000    0.043    0.000    2.517    0.000 parsers.py:168(_get_text_content)
    10000    0.010    0.000    2.374    0.000 parsers.py:535(_handle_timestamp)
    10000    0.014    0.000    2.364    0.000 utils.py:942(parse_timestamp)
    10000    0.058    0.000    2.343    0.000 utils.py:923(_parse_timestamp_with_tzinfo)
    13205    0.015    0.000    2.319    0.000 selector_events.py:818(_read_ready)
    13205    0.033    0.000    2.304    0.000 selector_events.py:858(_read_ready__data_received)
    10001    0.024    0.000    2.279    0.000 _parser.py:1270(parse)
    10001    0.047    0.000    2.255    0.000 _parser.py:572(parse)
    13202    0.047    0.000    2.101    0.000 sslproto.py:524(data_received)
    10001    0.279    0.000    1.939    0.000 _parser.py:666(_parse)
    10000    0.044    0.000    1.675    0.000 client.py:405(_convert_to_request_dict)
    10000    0.068    0.000    1.386    0.000 auth.py:359(canonical_request)
    10000    0.035    0.000    1.341    0.000 client.py:458(_resolve_endpoint_ruleset)
    55028    0.108    0.000    1.231    0.000 client_proto.py:190(data_received)
    10000    0.103    0.000    1.159    0.000 regions.py:11(construct_endpoint)
    20000    0.094    0.000    1.011    0.000 auth.py:231(headers_to_sign)
    10000    0.116    0.000    0.980    0.000 regions.py:61(_get_provider_params)
   110392    0.227    0.000    0.925    0.000 response.py:49(read)
    10003    0.081    0.000    0.917    0.000 cookiejar.py:235(filter_cookies)
    20003    0.011    0.000    0.898    0.000 awsrequest.py:479(prepare)
    10003    0.126    0.000    0.889    0.000 client_reqrep.py:605(send)
    20003    0.039    0.000    0.887    0.000 awsrequest.py:366(prepare)
    10000    0.013    0.000    0.844    0.000 endpoint.py:148(prepare_request)
    13476    0.119    0.000    0.838    0.000 sslproto.py:156(feed_ssldata)
    10000    0.035    0.000    0.833    0.000 endpoint.py:252(_needs_retry)
    20003    0.047    0.000    0.818    0.000 awsrequest.py:388(_prepare_headers)
    10001    0.058    0.000    0.799    0.000 _parser.py:199(split)
    20000    0.099    0.000    0.770    0.000 endpoint.py:23(convert_to_response_dict)
3190421/3170421    0.474    0.000    0.761    0.000 {built-in method builtins.isinstance}
    10001    0.069    0.000    0.760    0.000 useragent.py:258(to_string)
   110020    0.406    0.000    0.719    0.000 parse.py:444(urlsplit)
   170013    0.057    0.000    0.710    0.000 _parser.py:189(__next__)
    10000    0.024    0.000    0.708    0.000 validate.py:374(serialize_to_request)
    30006    0.034    0.000    0.703    0.000 awsrequest.py:612(__init__)
   110395    0.110    0.000    0.674    0.000 streams.py:347(read)
    30006    0.131    0.000    0.669    0.000 _collections_abc.py:991(update)
   420050    0.279    0.000    0.666    0.000 inspect.py:348(isawaitable)
   170013    0.392    0.000    0.653    0.000 _parser.py:77(get_token)
    10003    0.103    0.000    0.649    0.000 client_reqrep.py:244(__init__)
    10000    0.035    0.000    0.637    0.000 auth.py:434(_inject_signature_to_request)
    27251    0.069    0.000    0.619    0.000 selectors.py:452(select)
    65031    0.071    0.000    0.617    0.000 client_proto.py:172(_reschedule_timeout)
    10000    0.025    0.000    0.610    0.000 auth.py:479(_modify_request_before_signing)
蟒蛇 amazon-s3 boto3 python-asyncio aiobotocore

评论

2赞 AKX 11/14/2023
“瓶颈是什么”是一个问题,首先要通过分析你的代码并查看它把时间花在哪里来解决。请发布分析器结果。
0赞 AlwaysLearning 11/14/2023
@AKX 请参阅更新 2 和 3。
0赞 AKX 11/14/2023
如果按 排序,该表会更有用,但是啊,好吧,稍后会进行一些数字操作: gist.github.com/akx/1abf394df36f967f49e4e7a6471889c7 看到所有被称为 10,000 次或大约(或其倍数)的东西?这是每个请求的开销。例如,在这 25 秒的运行时间中,您似乎花了 4.2 秒来签署每个单独的请求(还记得我们是如何讨论微小请求的吗?),2.4 秒解析时间戳等(续)cumtime
1赞 AKX 11/14/2023
当然,如果应用这些优化失败,那么您将切换到比 Python 更快的语言。(用例如 Rust 编写西方最快的 S3 下载器本身就是一个有趣的练习......
1赞 AKX 11/14/2023
其次,是的,你是对的,核心 Python 是用 C 实现的,但如果你看一下分析结果,大部分时间不是花在核心 Python 上,而是花在 、 等上,不幸的是,Boto 并不是最好的 Python 代码。botocoredateutil

答: 暂无答案