如何让“concurrent.futures”处理流式处理任务?

How to let `concurrent.futures` handle streaming tasks?

提问人:SupernoobBran 提问时间:6/21/2023 更新时间:6/21/2023 访问量:94

问:

concurrent.futures是一个专为异步执行任务而设计的 Python 包。 它的典型用法如下(从链接复制):

# SuperFastPython.com
# example of waiting for all tasks in a collection of tasks to be done
from time import sleep
from random import random
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait
 
# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    # display the result
    print(name)
 
# start the thread pool
with ThreadPoolExecutor(10) as executor:
    # submit tasks and collect futures
    futures = [executor.submit(task, i) for i in range(10)]
    # wait for all tasks to complete
    wait(futures)
    print('All tasks are done!')

的工作模式是concurrent.futures

  1. 将待办任务提交到ThreadPoolExecutor
  2. 包会自动执行所有待办任务,并且
  • 等待(由 实现者)任务完成/EXCEPT 或wait()
  • 返回一系列任务(由 实现),这些任务按其完成时间排序。as_completed()

这样的工作模式就像是,你把一些任务提交到线程池,然后等待所有任务完成/遍历所有任务。下次我们提交到线程池时,它已经耗尽(其中没有任务)。

但是现在我正在解决一个新问题,我有流式处理任务(任务可能一个接一个或一批一批,但不是所有任务都一起来),我想连续将它们提交到线程池中(逐批提交,而不是一次提交所有任务),然后有一个 Iterable(可能像做什么一样)来接收任何完成的任务(按完成时间顺序)。as_completed()

如何实现这一点?如果此软件包不支持该功能,还有其他解决方案吗?concurrent.futures

python 并行处理 concurrent.futures

评论

0赞 jasonb 7/3/2023
您可以通过为每个任务向 Furure 添加一个完成的回调函数并让该函数报告看到的第一个结果来解决此问题。请参阅此处的示例:superfastpython.com/threadpoolexecutor-first-result-from-stream

答: 暂无答案