python 多处理池中的 Excepion 处理

Excepion handling in python multiprocessing pool

提问人:Daniel 提问时间:3/4/2022 最后编辑:Daniel 更新时间:3/9/2022 访问量:417

问:

我正在尝试在我的程序中处理 KeyboardInterrupt 异常,但我无法找到如何使用多处理池来做到这一点。即使我将池操作放在 try-exception 块中并处理异常,我还是收到了 4 个 KeyboardInterrupt 异常。

import time
import multiprocessing as mp

def calc(i):
    return i*i 

def main():
    try:
        with mp.Pool(4) as p:
            while True:
                print(p.map(calc, range(10)))
                time.sleep(1)
    except KeyboardInterrupt:
        print("\nShutting down.")
    except Exception as e:
        print(e)


if __name__ == '__main__':
    main()

我知道这些进程在隔离的环境中运行,但我也想以某种方式处理异常。

编辑:运行代码时得到的输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
^CProcess ForkPoolWorker-3:
Process ForkPoolWorker-5:
Process ForkPoolWorker-4:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
    with self._rlock:
  File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
KeyboardInterrupt
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 335, in get
    res = self._reader.recv_bytes()
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
    with self._rlock:
  File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Process ForkPoolWorker-6:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get
    with self._rlock:
  File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt

Shutting down.
python ubuntu 异常 多处理

评论

0赞 Kris 3/4/2022
“我收到 4 个 KeyboardInterrupt 异常。”你的意思是说它记录了关闭 4 次?
0赞 Atalajaka 3/4/2022
您能否向我们展示一个包含这四个 KeyboardInterrupt 异常的执行示例?这将澄清这个问题。
0赞 Daniel 3/4/2022
我的意思是每个进程在关闭之前都会抛出自己的 KeyboardInterrupt。
0赞 Aaron 3/5/2022
大多数操作系统会将信号传播到所有子进程。这是预期行为。您还需要在子进程中进行信号处理。 具有比 IIRC 更多的内置异常处理(否则它们执行相当相似的任务)。就我个人而言,当我需要更受控的信号/错误处理时,我经常创建自己的对象组,而不是使用预先构建的池。concurrent.futures.Executormultiprocessing.Poolmp.Process

答:

1赞 Booboo 3/5/2022 #1

由于您似乎在 Linux 类型的平台下运行(您确实应该使用该平台标记您的多处理问题),因此您需要忽略池进程中的 CTRL-C。最简单的方法是在创建池时使用 initializer 参数:

import time
import multiprocessing as mp


def init_pool_processes():
    """
    Each pool process will execute this as part of its
    initialization.
    """
    import signal
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def calc(i):
    return i*i

def main():
    try:
        with mp.Pool(4, initializer=init_pool_processes) as p:
            while True:
                print(p.map(calc, range(10)))
                time.sleep(1)
    except KeyboardInterrupt:
        print("\nShutting down.")
    except Exception as e:
        print(e)


if __name__ == '__main__':
    main()

更新

要处理工作函数引发的异常,您应该使用 method ,它返回一个迭代器,该迭代器在迭代时返回下一个返回值,或者在相应的任务引发异常时引发异常。这样,您可以捕获已提交的单个任务的异常。例如:imap

import multiprocessing as mp


def calc(i):
    if i == 3:
        raise ValueError(f'bad i value {i}')
    return i*i

def main():
    return_values = []
    with mp.Pool(4) as p:
        results = p.imap(calc, range(10))
        while True:
            try:
                return_value = next(results)
                return_values.append(return_value)
            except StopIteration:
                # No more results:
                break
            except Exception as e:
                # worker function raised an exception
                print('Got exception:', e)
                # Let's also append the exception as the return value:
                return_values.append(e)
        print(return_values)


if __name__ == '__main__':
    main()

指纹:

Got exception: bad i value 3
[0, 1, 4, ValueError('bad i value 3'), 16, 25, 36, 49, 64, 81]

评论

0赞 Daniel 3/6/2022
谢谢,我添加了标签 ubuntu 作为平台。
0赞 Daniel 3/9/2022
如何将此问题扩展到处理任意异常?
0赞 Booboo 3/9/2022
如果你的意思是工作函数中的异常,那是一个完全不同的问题。然后,您应该发布一个新问题,然后在此处发布新评论,并附上该问题的链接。或者你是别的意思?calc
0赞 Daniel 3/9/2022
是的,我的意思是工作器函数中的异常。
1赞 Booboo 3/9/2022
我已经更新了答案。但这确实应该是一个新问题,因为它完全不相关,其他人可能会从答案中受益。