在执行过程中将新任务/可调用对象提交到线程池的正确方法是什么?

What is the correct way to submit new tasks/callables to a thread pool in the middle of execution?

提问人:pwasoutside 提问时间:11/3/2023 最后编辑:pwasoutside 更新时间:11/3/2023 访问量:28

问:

我正在尝试找出在 python 中向池中添加新任务的最佳方法。我在这里包含的代码是我实际尝试实现的最小、简单的示例。在我的实际应用程序中,我有一个需要跨 10 个工作线程完成的任务列表。其中一个任务可能会失败,在这种情况下,我需要从该单个任务创建 2 个新任务,以便为同一池进行处理。ThreadPoolExecutor

以下代码通过在另一个线程的回调中向池添加新的可调用对象来模拟此用例。我不确定是否:

  1. 以这种方式添加新元素是安全的futures
  2. 将挂起,直到池中的这个新任务完成。while futures: futures.pop().result()

虽然这段代码有效,但我的直觉是它不可靠。我只是不确定在 python 中执行此操作的最佳方法是什么。

from concurrent.futures import ThreadPoolExecutor, wait
import time
import random

from queue import Queue

with ThreadPoolExecutor(max_workers=10) as pool:
    futures = []

    f = lambda *args, **kwargs: time.sleep(random.randint(1,5))

    def worker(i):
        def inner(*args, **kwargs):
            if i == 2:
                future = pool.submit(lambda *args, **kwargs: time.sleep(5))
                future.add_done_callback(lambda *args, **kwargs: print("new callback done"))
                futures.append(future)
                print(str(i) + " done")
        return inner
    for i in range(5):
        future = pool.submit(f)
        future.add_done_callback(worker(i))
        futures.append(future)

    while futures:
        futures.pop().result()

由于在线程完成之前附加了一个新元素,因此我可以保证在创建子任务之前,“生成”子任务的任务不会完成。但是,我不确定这是否足以确保在池关闭之前完成所有任务。futures

将不胜感激任何见解/指导

python 线程安全

评论


答: 暂无答案