提问人:Jellby 提问时间:11/14/2023 更新时间:11/15/2023 访问量:50
我可以退出“as_completed”池执行器循环(取消所有剩余任务)吗?
Can I exit an "as_completed" pool executor loop (cancelling all remaining tasks)?
问:
这个想法是运行一组任务,在它们完成时处理它们,一旦一个任务返回所需的结果,就取消剩余的任务。我不介意等待已经开始完成的任务,但应该禁止开始新任务。
我试过打电话,但在这里似乎不起作用:pool.shutdown(cancel_futures=True)
import time
import multiprocessing
import concurrent.futures
import random
filelist = ['001', '002', '003', '004', '005', '006', '007', '008', '009', '010']
parallel = 2
def worker(name, lock, index):
with lock:
index.value += 1
pc = index.value*10
print(f'starting: {name} ({pc})')
d = random.randrange(1, 5)
time.sleep(d)
return pc == 60
print('INIT')
with multiprocessing.Manager() as manager:
lock = manager.Lock()
index = manager.Value('b', 0)
if parallel > 0:
with concurrent.futures.ProcessPoolExecutor(max_workers=parallel) as pool:
tasks = [pool.submit(worker, filename, lock, index) for filename in filelist]
for task in concurrent.futures.as_completed(tasks):
rc = task.result()
if rc:
print('Found!')
pool.shutdown(cancel_futures=True)
break
else:
for filename in filelist:
rc = worker(filename, lock, index)
if rc:
print('Found!')
break
print('END')
在串行模式()下运行时,我得到所需的结果:parallel = 0
INIT
starting: 001 (10)
starting: 002 (20)
starting: 003 (30)
starting: 004 (40)
starting: 005 (50)
starting: 006 (60)
Found!
END
但是在并行激活后,它会一直持续到最后:
INIT
starting: 001 (10)
starting: 002 (20)
starting: 003 (30)
starting: 004 (40)
starting: 005 (50)
starting: 006 (60)
Found!
starting: 007 (70)
starting: 008 (80)
starting: 009 (90)
END
那么,有什么方法可以退出循环吗?as_completed
答:
0赞
padu
11/15/2023
#1
是的。例如,您可以使用模块中的类来做到这一点
例:Pool
multiprocessing
from functools import partial
import multiprocessing as mp
import time
def child_process(x, stop_event):
# stopping condition
if x == 4:
print('stop event is set')
stop_event.set()
# do something else
time.sleep(1)
if __name__ == "__main__":
print("main process start")
# Create exit event
exit_event = mp.Manager().Event()
# Create Pool
pool = mp.Pool(2)
print('pool started')
start = time.perf_counter()
pool.map_async(partial(child_process, stop_event=exit_event), range(20))
while 1:
if exit_event.is_set():
# close pool
pool.terminate()
pool.join()
break
print(f'pool interrupted. Time took: {time.perf_counter() - start:.2f} s.')
# do something
外:
main process start
pool started
stop event is set
pool interrupted. Time took: 1.17 s.
评论