如何在 Python 中使用“async for”?

How to use `async for` in Python?

提问人:PaleNeutron 提问时间:5/16/2019 更新时间:11/3/2023 访问量:78731

问:

我的意思是我从使用中得到什么.这是我写的代码,可以用 .async forasync forAIter(10)get_range()

但是代码像同步而不是异步一样运行。

import asyncio

async def get_range():
    for i in range(10):
        print(f"start {i}")
        await asyncio.sleep(1)
        print(f"end {i}")
        yield i

class AIter:
    def __init__(self, N):
        self.i = 0
        self.N = N

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        print(f"start {i}")
        await asyncio.sleep(1)
        print(f"end {i}")
        if i >= self.N:
            raise StopAsyncIteration
        self.i += 1
        return i

async def main():
    async for p in AIter(10):
        print(f"finally {p}")

if __name__ == "__main__":
    asyncio.run(main())

我排除的结果应该是:

start 1
start 2
start 3
...
end 1
end 2
...
finally 1
finally 2
...

然而,真正的结果是:

start 0
end 0
finally 0
start 1
end 1
finally 1
start 2
end 2

我知道我可以通过使用 or 来获得例外结果。asyncio.gatherasyncio.wait

但是我很难理解我在这里使用而不是简单的.async forfor

如果我想遍历多个对象并在一个对象完成后立即使用它们,正确的使用方法是什么。例如:async forFeature

async for f in feature_objects:
    data = await f
    with open("file", "w") as fi:
        fi.write()
异步 python-asyncio

评论

0赞 PaleNeutron 6/3/2019
@user4815162342,是的,非常感谢。但我仍在寻找一些例子.你能添加一个语法用法的例子吗?async sourceasync for
0赞 user4815162342 6/3/2019
任何异步生成器都可以用作异步源。有关更具体的示例,请参阅例如,此答案将一系列回调调用公开为异步迭代器,该迭代器可使用 进行迭代。async for
0赞 Tsonglew 12/12/2019
顺便说一句,您可以尝试 AIOFILES 以 asyncio 方式处理文件
0赞 Charlie Parker 6/23/2022
关于阻塞 for 循环的问题。我可以有一个常规的for循环并在其中等待,例如,这会将控制权返回给调用者并允许并发。右?请注意,我的睡眠当然很傻,因为只是为了模拟昂贵的运算(也称为 io-bound 运算)。for in range(10):await asyncio.sleep(i)
0赞 Charlie Parker 6/23/2022
是使用 NOT 的一个很好的例子,因为它会用隐式或其他东西获取下一个项目?async forasync forawait it.anext_step()

答:

155赞 user4815162342 5/16/2019 #1

但是我很难理解我在这里使用而不是简单的.async forfor

潜在的误解是期望异步自动并行化迭代。它不会这样做,它只是允许对异步源进行顺序迭代。例如,可用于循环访问来自 TCP 流的行、来自 websocket 的消息或来自异步数据库驱动程序的数据库记录。async for

以上都不适用于普通的,至少在不阻塞事件循环的情况下是行不通的。这是因为将 __next__ 作为阻塞函数调用,并且不等待其结果。您不能手动获取 的元素 because 期望通过引发 来发出迭代结束的信号。如果是协程,则在等待异常之前不会显示异常。这就是为什么不仅在 Python 中引入,而且在其他语言中引入 async/await 和 generalized 的原因。forforawaitforfor__next__StopIteration__next__StopIterationasync forfor

如果要并行运行循环迭代,则需要将它们作为并行协程启动,并使用 asyncio.as_completed 或等效项来检索它们的结果:

async def x(i):
    print(f"start {i}")
    await asyncio.sleep(1)
    print(f"end {i}")
    return i

# run x(0)..x(10) concurrently and process results as they arrive
for f in asyncio.as_completed([x(i) for i in range(10)]):
    result = await f
    # ... do something with the result ...

如果您不在乎在结果到达时立即做出反应,但您需要所有这些结果,则可以使用以下方法使其更简单:asyncio.gather

# run x(0)..x(10) concurrently and process results when all are done
results = await asyncio.gather(*[x(i) for i in range(10)])

评论

1赞 hBy2Py 3/11/2021
检查我的理解 - 您的两个代码片段(并且必须在异步函数/方法中执行,在由 启动的调用链中执行,对吧?for f in asyncio.as_completed...results = await ...asyncio.run(...)
1赞 user4815162342 3/11/2021
@hBy2Py正确。为了简洁起见,这个问题(因此也是答案)只是省略了这一部分。
1赞 Roelant 3/22/2021
我喜欢解释器,但缺少循环示例async for
2赞 user4815162342 3/22/2021
@Roelant 你说得对,举个例子会很有用。这个答案试图解决问题中提出的具体问题,这在当时是有道理的,但降低了它作为一般资源的价值。在这一点上添加一个现实生活中的例子会使答案比现在长得多。希望还有其他 SO 问题可以澄清这个问题,如果没有,也许是时候提出一个新问题了。
0赞 Charlie Parker 6/23/2022
关于阻塞 for 循环的问题。我可以有一个常规的for循环并在其中等待,例如,这会将控制权返回给调用者并允许并发。右?请注意,我的睡眠当然很傻,因为只是为了模拟昂贵的运算(也称为 io-bound 运算)。for in range(10):await asyncio.sleep(i)
12赞 matan129 6/26/2022 #2

(添加公认的答案 - 查理的赏金)。

假设您想同时使用每个产生的值,一个简单的方法是:

import asyncio

async def process_all():
    tasks = []

    async for obj in my_async_generator:
        # Python 3.7+. Use ensure_future for older versions.
        task = asyncio.create_task(process_obj(obj))
        tasks.append(task)
    
    await asyncio.gather(*tasks)


async def process_obj(obj):
    ...

解释:

请考虑以下代码,不带:create_task

async def process_all():
    async for obj in my_async_generator:
        await process_obj(obj))

这大致相当于:

async def process_all():
    obj1 = await my_async_generator.__anext__():
    await process_obj(obj1))

    obj2 = await my_async_generator.__anext__():
    await process_obj(obj2))
    
    ...

基本上,循环无法继续,因为它的主体被阻塞了。要走的方法是将每次迭代的处理委托给一个新的异步任务,该任务将在不阻塞循环的情况下启动。然后,等待所有任务 - 这意味着,每次迭代都要处理。gather

评论

1赞 Charlie Parker 6/27/2022
真的很喜欢你的榜样!希望我们真的能运行它。一个简短的评论,我认为值得一提的是,该函数实际上调度了一个协程以并发执行并且不会阻塞。.create_task(coroutine(args))
1赞 PaleNeutron 6/28/2022
那么,和之间的主要区别是和之间的区别吗?你能把答案再扩展一点吗?(用一个真实可以更好)。async forfor__next__()__anext__()my_async_generator
-1赞 Charlie Parker #3

基于@matan129奇妙答案的代码,只是缺少异步生成器以使其可运行,一旦我有了它(或者如果有人想贡献它)就会解决这个问题:


import time

import asyncio


async def process_all():
    """
    Example where the async for loop allows to loop through concurrently many things without blocking on each individual
    iteration but blocks (waits) for all tasks to run.
    ref:
    - https://stackoverflow.com/questions/56161595/how-to-use-async-for-in-python/72758067#72758067
    """
    tasks = []

    async for obj in my_async_generator:
        # Python 3.7+. Use ensure_future for older versions.
        task = asyncio.create_task(process_obj(obj))  # concurrently dispatches a coroutine to be executed.
        tasks.append(task)

    await asyncio.gather(*tasks)


async def process_obj(obj):
    await asyncio.sleep(5)  # expensive IO


if __name__ == '__main__':
    # - test asyncio
    s = time.perf_counter()
    asyncio.run(process_all())
    # - print stats
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")
    print('Success, done!\a')
0赞 l001d 6/22/2023 #4

正如其他人所指出的,不会创建要并发运行的任务。它用于允许对异步源进行顺序迭代。async for

例如,在 中,您可以执行 。 在每次迭代中都会调用 in 中的方法。此方法定义为 ,允许在其内部进行调用。aiokafkaasync for msg in consumer__anext__consumerasync def __anext__await self.get_one()

相比之下,当您使用普通的 for 循环时,它会在内部调用特殊方法。但是,常规方法不支持等待异步源,例如使用 .__next____next__await get_one()