我可以退出“as_completed”池执行器循环(取消所有剩余任务)吗?

Can I exit an "as_completed" pool executor loop (cancelling all remaining tasks)?

提问人:Jellby 提问时间:11/14/2023 更新时间:11/15/2023 访问量:50

问:

这个想法是运行一组任务,在它们完成时处理它们,一旦一个任务返回所需的结果,就取消剩余的任务。我不介意等待已经开始完成的任务,但应该禁止开始新任务。

我试过打电话,但在这里似乎不起作用:pool.shutdown(cancel_futures=True)

import time
import multiprocessing
import concurrent.futures
import random

filelist = ['001', '002', '003', '004', '005', '006', '007', '008', '009', '010']
parallel = 2

def worker(name, lock, index):
    with lock:
        index.value += 1
        pc = index.value*10
        print(f'starting: {name} ({pc})')
    d = random.randrange(1, 5)
    time.sleep(d)
    return pc == 60

print('INIT')

with multiprocessing.Manager() as manager:
    lock = manager.Lock()
    index = manager.Value('b', 0)
    if parallel > 0:
        with concurrent.futures.ProcessPoolExecutor(max_workers=parallel) as pool:
            tasks = [pool.submit(worker, filename, lock, index) for filename in filelist]
            for task in concurrent.futures.as_completed(tasks):
                rc = task.result()
                if rc:
                    print('Found!')
                    pool.shutdown(cancel_futures=True)
                    break
      else:
          for filename in filelist:
              rc = worker(filename, lock, index)
              if rc:
                  print('Found!')
                  break

print('END')

在串行模式()下运行时,我得到所需的结果:parallel = 0

INIT
starting: 001 (10)
starting: 002 (20)
starting: 003 (30)
starting: 004 (40)
starting: 005 (50)
starting: 006 (60)
Found!
END

但是在并行激活后,它会一直持续到最后:

INIT
starting: 001 (10)
starting: 002 (20)
starting: 003 (30)
starting: 004 (40)
starting: 005 (50)
starting: 006 (60)
Found!
starting: 007 (70)
starting: 008 (80)
starting: 009 (90)
END

那么,有什么方法可以退出循环吗?as_completed

蟒蛇 python-multiprocessing concurrent.futures

评论


答:

0赞 padu 11/15/2023 #1

是的。例如,您可以使用模块中的类来做到这一点 例:Poolmultiprocessing

from functools import partial
import multiprocessing as mp
import time


def child_process(x, stop_event):
    # stopping condition
    if x == 4:
        print('stop event is set')
        stop_event.set()
    # do something else
    time.sleep(1)


if __name__ == "__main__":
    print("main process start")

    # Create exit event
    exit_event = mp.Manager().Event()
    # Create Pool
    pool = mp.Pool(2)
    print('pool started')
    start = time.perf_counter()
    pool.map_async(partial(child_process, stop_event=exit_event), range(20))
    while 1:
        if exit_event.is_set():
            # close pool
            pool.terminate()
            pool.join()
            break
    print(f'pool interrupted. Time took: {time.perf_counter() - start:.2f} s.')
    # do something

外:

main process start
pool started
stop event is set
pool interrupted. Time took: 1.17 s.