与异步循环和线程混淆

Confused with async loops and threads

提问人:AlwaysLearning 提问时间:11/15/2023 更新时间:11/15/2023 访问量:34

问:

我有一个使用异步 I/O 的代码,并尝试通过使用多线程使其使用可用内核。特别是,对于两个内核,一半的异步任务将在每个线程中运行。这是我的尝试:

async def task(s3, key):
    await s3.read(key)

def thread_worker(s3, keys, loop):
    tasks = []
    for key in keys:
        tasks.append(asyncio.create_task(task(s3, key))) # ERROR
    loop.run_until_complete(asyncio.gather(*tasks))

async def main(n_threads):
    loop = asyncio.get_event_loop()
    keys = [f"{content_id}" for content_id in range(1000)]
    session = aioboto3.session.Session()
    config = aiobotocore.config.AioConfig(max_pool_connections=30)
    async with session.client('s3', region_name="eu-west-2", config=config) as client:
        global s3
        s3 = S3(client, "mybucket")
        timer = Timer()
        keys_per_thread = math.ceil(len(keys)/n_threads)
        with concurrent.futures.ThreadPoolExecutor(max_workers = n_threads) as executor:
            futures = []
            for i in range(n_threads):
                my_keys = keys[i * keys_per_thread : (i + 1) * keys_per_thread]
                futures.append(
                    executor.submit(thread_worker, s3=s3, keys=my_keys, loop=loop)) # SUBMIT
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
        print(f"{timer.stop()}ms")

asyncio.run(main(n_threads=2))

最重要的行以注释结尾。 我得到了从未等待过的错误。另一方面,我不能声明为 ,因为这样我就无法将其提交给 .task()thread_workerasyncexecutor

我该如何解决这个问题?

蟒蛇 python-asyncio python- 多线程

评论

1赞 Viktor Veselý 11/15/2023
注意,Python 中的异步编程将无法“利用”所有内核,为此您需要“mutliprocessing”包。但就你而言,asyncio 应该足够好。
1赞 Paul Cornelius 11/16/2023
asyncio 事件循环仅在单个线程中运行。您创建的额外线程没有事件循环,因此会出现错误。可以使用 ThreadPoolExecutor 函数的关键字启动事件循环。这将解决您的错误。但它不会实现你所说的利用多核的目标,因为单进程 Python 程序(几乎总是)只在单个内核上运行。正如 Viktor Vesely 正确指出的那样,您需要编写一个多处理程序来使用多个内核。initializer=

答: 暂无答案