提问人:PaleNeutron 提问时间:5/16/2019 更新时间:11/3/2023 访问量:78731
如何在 Python 中使用“async for”?
How to use `async for` in Python?
问:
我的意思是我从使用中得到什么.这是我写的代码,可以用 .async for
async for
AIter(10)
get_range()
但是代码像同步而不是异步一样运行。
import asyncio
async def get_range():
for i in range(10):
print(f"start {i}")
await asyncio.sleep(1)
print(f"end {i}")
yield i
class AIter:
def __init__(self, N):
self.i = 0
self.N = N
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
print(f"start {i}")
await asyncio.sleep(1)
print(f"end {i}")
if i >= self.N:
raise StopAsyncIteration
self.i += 1
return i
async def main():
async for p in AIter(10):
print(f"finally {p}")
if __name__ == "__main__":
asyncio.run(main())
我排除的结果应该是:
start 1
start 2
start 3
...
end 1
end 2
...
finally 1
finally 2
...
然而,真正的结果是:
start 0
end 0
finally 0
start 1
end 1
finally 1
start 2
end 2
我知道我可以通过使用 or 来获得例外结果。asyncio.gather
asyncio.wait
但是我很难理解我在这里使用而不是简单的.async for
for
如果我想遍历多个对象并在一个对象完成后立即使用它们,正确的使用方法是什么。例如:async for
Feature
async for f in feature_objects:
data = await f
with open("file", "w") as fi:
fi.write()
答:
但是我很难理解我在这里使用而不是简单的.
async for
for
潜在的误解是期望异步自动
并行化迭代。它不会这样做,它只是允许对异步源进行顺序迭代。例如,可用于循环访问来自 TCP 流的行、来自 websocket 的消息或来自异步数据库驱动程序的数据库记录。async for
以上都不适用于普通的,至少在不阻塞事件循环的情况下是行不通的。这是因为将 __next__
作为阻塞函数调用,并且不等待其结果。您不能手动获取 的元素 because 期望通过引发 来发出迭代结束的信号。如果是协程,则在等待异常之前不会显示异常。这就是为什么不仅在 Python 中引入,而且在其他语言中引入 async/await 和 generalized 的原因。for
for
await
for
for
__next__
StopIteration
__next__
StopIteration
async for
for
如果要并行运行循环迭代,则需要将它们作为并行协程启动,并使用 asyncio.as_completed
或等效项来检索它们的结果:
async def x(i):
print(f"start {i}")
await asyncio.sleep(1)
print(f"end {i}")
return i
# run x(0)..x(10) concurrently and process results as they arrive
for f in asyncio.as_completed([x(i) for i in range(10)]):
result = await f
# ... do something with the result ...
如果您不在乎在结果到达时立即做出反应,但您需要所有这些结果,则可以使用以下方法使其更简单:asyncio.gather
# run x(0)..x(10) concurrently and process results when all are done
results = await asyncio.gather(*[x(i) for i in range(10)])
评论
for f in asyncio.as_completed...
results = await ...
asyncio.run(...)
async for
for in range(10):
await asyncio.sleep(i)
(添加公认的答案 - 查理的赏金)。
假设您想同时使用每个产生的值,一个简单的方法是:
import asyncio
async def process_all():
tasks = []
async for obj in my_async_generator:
# Python 3.7+. Use ensure_future for older versions.
task = asyncio.create_task(process_obj(obj))
tasks.append(task)
await asyncio.gather(*tasks)
async def process_obj(obj):
...
解释:
请考虑以下代码,不带:create_task
async def process_all():
async for obj in my_async_generator:
await process_obj(obj))
这大致相当于:
async def process_all():
obj1 = await my_async_generator.__anext__():
await process_obj(obj1))
obj2 = await my_async_generator.__anext__():
await process_obj(obj2))
...
基本上,循环无法继续,因为它的主体被阻塞了。要走的方法是将每次迭代的处理委托给一个新的异步任务,该任务将在不阻塞循环的情况下启动。然后,等待所有任务 - 这意味着,每次迭代都要处理。gather
评论
.create_task(coroutine(args))
async for
for
__next__()
__anext__()
my_async_generator
基于@matan129奇妙答案的代码,只是缺少异步生成器以使其可运行,一旦我有了它(或者如果有人想贡献它)就会解决这个问题:
import time
import asyncio
async def process_all():
"""
Example where the async for loop allows to loop through concurrently many things without blocking on each individual
iteration but blocks (waits) for all tasks to run.
ref:
- https://stackoverflow.com/questions/56161595/how-to-use-async-for-in-python/72758067#72758067
"""
tasks = []
async for obj in my_async_generator:
# Python 3.7+. Use ensure_future for older versions.
task = asyncio.create_task(process_obj(obj)) # concurrently dispatches a coroutine to be executed.
tasks.append(task)
await asyncio.gather(*tasks)
async def process_obj(obj):
await asyncio.sleep(5) # expensive IO
if __name__ == '__main__':
# - test asyncio
s = time.perf_counter()
asyncio.run(process_all())
# - print stats
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
print('Success, done!\a')
正如其他人所指出的,不会创建要并发运行的任务。它用于允许对异步源进行顺序迭代。async for
例如,在 中,您可以执行 。
在每次迭代中都会调用 in 中的方法。此方法定义为 ,允许在其内部进行调用。aiokafka
async for msg in consumer
__anext__
consumer
async def __anext__
await self.get_one()
相比之下,当您使用普通的 for 循环时,它会在内部调用特殊方法。但是,常规方法不支持等待异步源,例如使用 .__next__
__next__
await get_one()
评论
async source
async for
async for
for in range(10):
await asyncio.sleep(i)
async for
async for
await it.anext_step()