为什么这个process.join()不让这个程序等待所有进程完成?

Why does this process.join() not make this program wait for all processes to finish?

提问人:Cosmo H 提问时间:11/16/2023 最后编辑:Christoph RackwitzCosmo H 更新时间:11/17/2023 访问量:46

问:

我正在尝试编写一个程序(在 Python 中),该程序将所有颜色与图像中的键相似的点隔离开来,当我尝试使用多处理对其进行优化时,我遇到了一个问题。无论是否使用 .join(),这些进程都不会全部给出结果。 相关的(简化)代码是这样的:

def func(a,b,c,d,queue):
    #*...some code that takes a long time (2+ seconds)...*
    out = [a,b,c]
    index = d
    queue.put((out, index))
procs = []
q = multiprocessing.Queue()
for p in range(nP):
    #*...some code that calculates the parameters...*
    proc = multiprocessing.Process(target=func,args = (a,b,c,d,q))
    proc.start()
    procs.append(proc)
values = []
kInds = []
while not q.empty():
    t = q.get()
    values.append(t[0])
    kInds.append(t[1])
    print(t)
#this bit V
for pro in procs:
    pro.join()
#*...rest of code...*

其中,被调用的函数是 s 的元组(与颜色键匹配的索引列表,与数字键匹配的索引列表)。queue.put()

据我所知,如果没有标记的代码来连接进程,主进程将继续进行,因此某些进程不会像预期的那样返回所有结果。 但是,当我使用 .join() 时,某些进程仍然没有返回其所有值,然后代码完全停止。如果我把它放在 q.get() 子句之前,那么程序就会永远停止。 这是让程序等到所有结果都放上并读出队列的正确方法吗?有没有更好的方法可以做到这一点?

多处理 python-multiprocessing

评论

0赞 Random Davis 11/16/2023
我认为问题更多的是.除非你对这些流程的控制为零,否则你可能应该关注为什么这些流程永远不会完成,而不是试图以任何方式解决这个问题。The processes don't all finish giving their resultsjoin
0赞 Barmar 11/16/2023
我认为问题可能在于主进程从队列中读取的速度比子进程推送到队列中的速度快。因此,当一些子项仍在添加队列时,它将清空队列。在主进程从队列中提取之前,无法联接慢速子进程。所以你有一个僵局。
0赞 Cosmo H 11/16/2023
谢谢,这似乎是对的!我更改了它以在它宣布队列为空之前添加一点延迟,这似乎解决了它。

答:

0赞 Gabriel Ramuglia 11/16/2023 #1

问题在于,主进程可能在所有进程完成将结果放入队列之前完成从队列中读取。您应该在所有进程完成后使用 .join(),然后读取队列。

试试这个:

import multiprocessing

def func(a, b, c, d, queue):
    out = [a, b, c]
    index = d
    queue.put((out, index))

procs = []
q = multiprocessing.Queue()
for p in range(nP):
    proc = multiprocessing.Process(target=func, args=(a, b, c, d, q))
    proc.start()
    procs.append(proc)

for pro in procs:
    pro.join()

values = []
kInds = []
while not q.empty():
    t = q.get()
    values.append(t[0])
    kInds.append(t[1])
    print(t)

来源:我的文章 https://ioflood.com/blog/python-multiprocessing/

1赞 tdelaney 11/16/2023 #2

q.empty()并不意味着它总是空的。更糟糕的是,文档警告说:

如果队列为空,则返回 True,否则返回 False。以 多线程/多处理语义,这是不可靠的。

您知道预期会有多少结果,因此请从队列中读取这些结果。确保进程管理其错误,以便所有进程都返回某些内容。您可能希望在获取时设置超时,以防万一其中一个子进程出现严重错误。

def func(a,b,c,d,queue):
    try:
        #*...some code that takes a long time (2+ seconds)...*
        out = [a,b,c]
        index = d
        queue.put((out, index))
    except:
        queue.put(None)
        raise

procs = []
q = multiprocessing.Queue()
for p in range(nP):
    #*...some code that calculates the parameters...*
    proc = multiprocessing.Process(target=func,args = (a,b,c,d,q))
    proc.start()
    procs.append(proc)
values = []
kInds = []

for _ in range(nP):
    t = q.get()
    if t is not None:
        values.append(t[0])
        kInds.append(t[1])
        print(t)
    else:
        print('bad')
#this bit V
for pro in procs:
    pro.join()
  

更好的是,让 Python 为你完成工作。存在处理池。

def func(a,b,c,d,queue):
    #*...some code that takes a long time (2+ seconds)...*
    out = [a,b,c]
    index = d
    return out, index

values = []
kInds = []

with multiprocessing.Pool(len(nP)) as pool:
    params_list = *...some code that calculates the parameters...*
    for t in pool.map(func, params_list):
        values.append(t[0])
        kInds.append(t[1])
0赞 Aaron 11/17/2023 #3

构建健壮的 python 多处理代码有许多陷阱,预构建是一个很好的工具,但有时你会发现自己想要更多/不同的东西。multiprocessing.Pool

竞争条件

使用您的代码时,您将遇到一个争用条件,即在所有子进程完成计算并尝试传回其结果之前,主进程会清空队列。此外,python 文档暗示,对 和 的调用通常对竞争条件不安全。通常有两种方法(更多可能)我通常绕过队列中的竞争条件:queue.empty()queue.qsize()

  1. 确切地知道预期有多少结果:

for _ in range(n_tasks): q.get()

  1. 每个工作进程都应发送一个“我已完成”的哨兵,并期望哨兵。n_workers
class Sentinel: pass
n_sentinels = 0
while True:
    item = q.get()
    if isinstance(item, Sentinel):
        n_sentinels += 1
        if n_sentinels == n_workers:
            break
    else:
        #process item

僵局

使用队列处理多处理任务时的另一个常见问题可能是死锁。特别是,并且可能不会立即返回,并且可能正在等待另一个进程或线程将某些内容放入队列或取出某些内容。Python 的多处理队列由操作系统管道支持,这些管道无法在其中存储无限量的数据,否则会阻塞。在您的例子中,数据很小,在开始读取数据之前,可以很好地写入所有数据并等待进程完成。但是,当您远离玩具示例时,这可能会停止工作。通常,在从队列中读取所有数据之前,不应尝试加入进程,否则,当子进程等待时,您可能正在等待主进程中的联接,并且在清空队列之前永远不会加入。健壮的多处理代码还必须考虑到有时会出错,因此我想说的是,不一定总是使用,但始终考虑是否应该为任何队列或锁定操作使用超时值。当发生超时时,您可能有机会检查,例如,子进程是否以非零退出码(异常)退出。q.getq.putq.putq.put

q = Queue()
workers = []
for _ in range(n_workers):
    p = Process(target=work_func, args=(a, b, q))
    workers.append(p)
    p.start()

while any(w.exitcode is None for w in workers): #while any worker is still running
    try:
        handle_results(q.get(True, 1)) #wait for at most 1 second before checking if any workers are still alive
    except queue.Empty:
        #perform other housekeeping while waiting

#for w in workers: w.join() #This is technically redundant to looping until all processes have an exitcode

while not q.empty(): #once we are single-threaded race conditions don't apply anymore, and this should be reliable.
    handle_results(q.get()) #handle any remaining queue items.