提问人:Azmisov 提问时间:11/8/2023 最后编辑:Azmisov 更新时间:11/9/2023 访问量:90
如何将 Python 生成器转换为异步生成器?
How to convert a Python generator to async generator?
问:
我在 Python 中有一个受 IO 限制的生成器函数。我想将其转换为异步生成器,其中生成器循环在单独的进程或线程中运行。例如,从套接字加载数据块,我们希望在处理前一个数据块的同时加载下一个数据块。我想象将使用一个队列,其中 IO 线程正在缓冲块(产量输出)供异步生成器拾取。最好是使用 concurrent.futures 模块来灵活地在单独的进程和线程之间进行选择。解决这个问题的最佳方法是什么?
import time
def blocking():
""" plain generator with blocking i/o """
i = 0
while True:
time.sleep(1)
yield i # fake result
i += 1
def consumer():
for chunk in blocking():
print(chunk)
答:
3赞
jsbueno
11/8/2023
#1
是的,不幸的是,这并不简单。
下面的代码段中的可选内容是“任务组”,但它有助于良好实践:
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor # (or Process*)
executor = ThreadPoolExecutor(10) # Put this wherever you like, with as many workers as you want.
def blocking_task(i):
time.sleep(1)
return i
async def async_gen():
""" plain generator with blocking i/o """
async with asyncio.TaskGroup() as tg:
loop = asyncio.get_running_loop()
async def taskwrapper(awaitable):
return await awaitable
tasks = {tg.create_task(taskwrapper(loop.run_in_executor(executor, blocking_task, i,))) for i in range(10)}
while tasks:
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
yield task.result()
tasks = pending
async def consumer():
async for i in async_gen():
print(i)
asyncio.run(consumer())
因此,这时您可以在一个任务中分离每个项目提取,该任务可以在线程池中作为单独的任务运行。
如果你只想保留一个同步流,然后包装它,以便它可以在异步循环中使用,如果你可以在它自己的线程中运行该同步生成器,这可以通过添加一些辅助函数和控制流的东西来完成 - 下面的代码片段可能会有所帮助。该函数将负责在另一个线程中启动生成器并异步获取块:async_gen
import asyncio
from threading import Thread
from queue import Queue as ThreadingQueue, Empty
import time
def blocking():
""" plain generator with blocking i/o """
# this will run in other thread, and produced results
# are comunicated to the asyncio loop in a queue by the
# "middleware" helper functions
i = 0
while True:
time.sleep(1)
yield i
i += 1
def _threading_queue_item_ready(semaphore):
# increase the count of available items in the queue so
# that it can be read in an async-friendly way:
semaphore.release()
def _producer_wrapper(gen, loop, queue, semaphore):
def wrapper(*args, **kwargs):
for item in gen():
queue.put(item)
loop.call_soon_threadsafe(_threading_queue_item_ready, semaphore)
wrapper()
async def async_gen(sync_gen):
"""wrapper for the "generator asynchronization" mechanism"""
semaphore = asyncio.Semaphore(0)
queue = ThreadingQueue()
producer_thread = Thread(target=_producer_wrapper, args=(sync_gen, asyncio.get_running_loop(), queue, semaphore))
producer_thread.start()
while True:
await semaphore.acquire()
# we only get here when there is at least one item in the queue
item = queue.get()
yield item
# intentionaly do not "release" the semaphore: available item count
# is increased by the producer code
async def consumer():
async for chunk in async_gen(blocking):
print(chunk)
asyncio.run(consumer())
评论
0赞
user2357112
11/8/2023
我不认为这是他们的意思 - 我认为他们仍然希望按顺序运行 I/O,只是在执行器中。
0赞
Azmisov
11/8/2023
是的,我的问题实际上是关于顺序 IO;例如,我们正在从套接字加载数据块,因此不能作为单独的任务运行。(我稍微修改了这个问题,使这一点更清楚)。不过,这个答案仍然是一个有用的参考。
0赞
jsbueno
11/8/2023
因此,在这种情况下,您的示例并不好:需要“转换为异步”的正是我们的 I/O 调用,并且如何执行此操作将特定于该调用。对于这种情况,补救措施是将其更改为 - 然后,只需更改 .在这种情况下,我想工作线程可用于 I/O 并向 asyncio 循环线程发出信号,它得到了一个新块(池化同步、线程、队列就可以了,但我想使用 asyncio 同步原语的机制会更好)time.sleep(1)
await asyncio.sleep
def blocking():
async def blocking()
上一个:如何将值传递给迭代器
评论