提问人:AlwaysLearning 提问时间:11/15/2023 更新时间:11/15/2023 访问量:34
与异步循环和线程混淆
Confused with async loops and threads
问:
我有一个使用异步 I/O 的代码,并尝试通过使用多线程使其使用可用内核。特别是,对于两个内核,一半的异步任务将在每个线程中运行。这是我的尝试:
async def task(s3, key):
await s3.read(key)
def thread_worker(s3, keys, loop):
tasks = []
for key in keys:
tasks.append(asyncio.create_task(task(s3, key))) # ERROR
loop.run_until_complete(asyncio.gather(*tasks))
async def main(n_threads):
loop = asyncio.get_event_loop()
keys = [f"{content_id}" for content_id in range(1000)]
session = aioboto3.session.Session()
config = aiobotocore.config.AioConfig(max_pool_connections=30)
async with session.client('s3', region_name="eu-west-2", config=config) as client:
global s3
s3 = S3(client, "mybucket")
timer = Timer()
keys_per_thread = math.ceil(len(keys)/n_threads)
with concurrent.futures.ThreadPoolExecutor(max_workers = n_threads) as executor:
futures = []
for i in range(n_threads):
my_keys = keys[i * keys_per_thread : (i + 1) * keys_per_thread]
futures.append(
executor.submit(thread_worker, s3=s3, keys=my_keys, loop=loop)) # SUBMIT
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f"{timer.stop()}ms")
asyncio.run(main(n_threads=2))
最重要的行以注释结尾。
我得到了从未等待过的错误。另一方面,我不能声明为 ,因为这样我就无法将其提交给 .task()
thread_worker
async
executor
我该如何解决这个问题?
答: 暂无答案
评论
initializer=