提问人:Mikko Ohtamaa 提问时间:11/9/2023 更新时间:11/14/2023 访问量:65
在 Jupyter Notebook 中使用多处理参数函数
Parallesing functions using multiprocessing in Jupyter Notebook
问:
我在 Jupyter Notebook 中运行 CPU 密集型任务。并行化任务是微不足道的,我已经能够通过线程在笔记本中做到这一点。然而,由于 Python 的 GIL,这是低效的,因为 GIL 阻止了有效地利用多个 CPU 内核执行并行任务。
显而易见的解决方案是 Python 模块,我将其与 Python 应用程序代码(而不是笔记本)一起使用。但是,由于 Jupyter Notebook 的运行方式,由于缺少入口点而失败。multiprocessing
multiprocessing
__main__
如果我使用进程池执行程序,则会出现错误:
Started Trading Strategy in Jupyter notebook environment, configuration is stored in /Users/moo/.tradingstrategy
Prepared trading pair data for 5 pairs
Processing grid search 576 background jobs
0%
0/576 [00:00<?, ?it/s]
Process SpawnProcess-1:
Traceback (most recent call last):
File "/opt/homebrew/Cellar/[email protected]/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/opt/homebrew/Cellar/[email protected]/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/opt/homebrew/Cellar/[email protected]/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/process.py", line 240, in _process_worker
call_item = call_queue.get(block=True)
File "/opt/homebrew/Cellar/[email protected]/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/queues.py", line 122, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process_background_job' on <module '__main__' (built-in)>
我目前使用线程池的并行化:
results = []
def process_background_job(a, b):
# Do something for the batch of data and return results
pass
# If you switch to futureproof.executors.ProcessPoolExecutor
# here it will crash with the above error
executor = futureproof.executors.ThreadPoolExecutor(max_workers=8)
with futureproof.TaskManager(executor, error_policy="log") as task_manager:
# Send individual jobs to the multiprocess worker pool
total_tasks = 0
for look_back in look_backs:
for look_forward in look_forwards:
task_manager.submit(process_background_job, look_back, look_forward)
total_tasks += 1
print(f"Processing grid search {total_tasks} background jobs")
# Run the background jobs and read back the results from the background worker
# with a progress bar
with tqdm(total=total_tasks) as progress_bar:
for task in task_manager.as_completed():
if isinstance(task.result, Exception):
executor.join()
raise RuntimeError(f"Could not complete task for args {task.args}") from task.result
look_back, look_forward, long_regression, short_regression = task.result
results.append([
look_back,
look_forward,
long_regression.rsquared,
short_regression.rsquared
])
progress_bar.update()
如何在笔记本中使用基于进程的并行化?
答:
您可以使用并行杆库轻松实现所需的行为。 例:
#pip install parallelbar
from parallelbar import progress_map
# your process function here
def foo(n):
#we simulate the occurrence of errors
if n==5 or n==17:
1/0
elif n==10:
#we simulate timeout exceeding
time.sleep(1)
else:
time.sleep(.1)
return n
res = progress_map(foo, range(20), process_timeout=0.5, error_behavior='coerce', return_failed_tasks=True)
print(f'Result: {res[0]}')
print(f'Failed tasks: {res[1]}')
外:
Result: [0, 1, 2, 3, 4, ZeroDivisionError('division by zero'), 6, 7, 8, 9, 'function foo took longer than 0.5 s.',
11, 12, 13, 14, 15, 16, ZeroDivisionError('division by zero'), 18, 19]
Failed tasks: [5, 10, 17]
这适用于任何操作系统和 jupyter 笔记本
没错,操作系统 Windows 有一个例外。为此,您需要从模块中导入函数,即首先将其保存在某个模块中jupyter notebook
foo
评论
您似乎正在使用 macOS,而您遇到的问题是因为缺乏对 macOS 中分叉进程的完全支持。因此,在 macOS 上使用 spawn
方法启动子进程。以下段落描述了出现此问题的原因。简单的解决方案是在可由笔记本和工作进程导入的模块中定义函数。或者,跳到底部,了解使用 cloudpickle
的解决方法。multiprocessing
当您的进程(在 Linux 上启动多处理工作进程的默认方法)时,您将获得父进程内存的副本。这意味着工作进程可以访问和调用模块在分叉时在模块中定义的任何函数。但是,使用该方法创建的工作进程从一张白纸开始。因此,他们必须能够通过引用找到函数。这意味着导入函数的源模块,并在模块中按名称查找它。如果在模块中定义了一个函数,那么它必须是可导入的,并且多处理是预期的。fork
__main__
spawn
__main__
__main__
当您启动 Jupyter 笔记本时,它会启动一个新内核。该内核是基于 REPL 的,而不是基于源/文件的。因此,该模块将是内核的 REPL 的模块,而不是您插入到笔记本单元格中的代码。__main__
就目前而言,没有办法强制多处理能够拾取 macOS(或 Windows)上 REPL 中定义的源。但是,有一种可能性。如果我们改变 python pickles 函数的方式,那么我们可以将函数发送到工作进程。CloudPickle
是一个第三方库,它对函数进行整体腌制,而不是通过引用。因此。你可以 monkey-patch 使用reducer_override,这样多处理将使用 cloudpickle 而不是 pickle 来 pickle 函数。cloudpickle
multiprocessing.reduction.ForkingPickler
import sys
from multiprocessing import Pool
from multiprocessing.reduction import ForkingPickler
from types import FunctionType
import cloudpickle
assert sys.version_info >= (3, 8), 'python3.8 or greater required to use reducer_override'
def reducer_override(obj):
if type(obj) is FunctionType:
return (cloudpickle.loads, (cloudpickle.dumps(obj),))
else:
return NotImplemented
# Monkeypatch our function reducer into the pickler for multiprocessing.
# Without this line, the main block will not work on windows or macOS.
# Alterntively, moving the defintionn of foo outside of the if statement
# would make the main block work on windows or macOS (when run from
# the command line).
ForkingPickler.reducer_override = staticmethod(reducer_override)
if __name__ == '__main__':
def foo(x, y):
return x * y
with Pool() as pool:
res = pool.apply(foo, (10, 3))
print(res)
assert res == 30
评论
futureproof.executors.ProcessPoolExecutor
ipython
multiprocessing.Pool().map()