在 Jupyter Notebook 中使用多处理参数函数

Parallesing functions using multiprocessing in Jupyter Notebook

提问人:Mikko Ohtamaa 提问时间:11/9/2023 更新时间:11/14/2023 访问量:65

问:

我在 Jupyter Notebook 中运行 CPU 密集型任务。并行化任务是微不足道的,我已经能够通过线程在笔记本中做到这一点。然而,由于 Python 的 GIL,这是低效的,因为 GIL 阻止了有效地利用多个 CPU 内核执行并行任务。

显而易见的解决方案是 Python 模块,我将其与 Python 应用程序代码(而不是笔记本)一起使用。但是,由于 Jupyter Notebook 的运行方式,由于缺少入口点而失败。multiprocessingmultiprocessing__main__

如果我使用进程池执行程序,则会出现错误:

Started Trading Strategy in Jupyter notebook environment, configuration is stored in /Users/moo/.tradingstrategy

Prepared trading pair data for 5 pairs
Processing grid search 576 background jobs
0%
0/576 [00:00<?, ?it/s]
Process SpawnProcess-1:
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/[email protected]/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/homebrew/Cellar/[email protected]/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/homebrew/Cellar/[email protected]/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/process.py", line 240, in _process_worker
    call_item = call_queue.get(block=True)
  File "/opt/homebrew/Cellar/[email protected]/3.10.12_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/queues.py", line 122, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process_background_job' on <module '__main__' (built-in)>

我目前使用线程池的并行化:

results = []

def process_background_job(a, b):
   # Do something for the batch of data and return results
   pass

# If you switch to futureproof.executors.ProcessPoolExecutor
# here it will crash with the above error
executor = futureproof.executors.ThreadPoolExecutor(max_workers=8)
with futureproof.TaskManager(executor, error_policy="log") as task_manager:
    
    # Send individual jobs to the multiprocess worker pool
    total_tasks = 0
    for look_back in look_backs:
        for look_forward in look_forwards:
            task_manager.submit(process_background_job, look_back, look_forward)
            total_tasks += 1

    print(f"Processing grid search {total_tasks} background jobs")

    # Run the background jobs and read back the results from the background worker
    # with a progress bar
    with tqdm(total=total_tasks) as progress_bar:
        for task in task_manager.as_completed():
            if isinstance(task.result, Exception):
                executor.join()
                raise RuntimeError(f"Could not complete task for args {task.args}") from task.result
            
            look_back, look_forward, long_regression, short_regression = task.result
            results.append([
                look_back,
                look_forward,
                long_regression.rsquared,
                short_regression.rsquared
            ])
            progress_bar.update()

如何在笔记本中使用基于进程的并行化?

jupyter-notebook python-multiprocessing

评论

1赞 Klaus D. 11/9/2023
看起来像一个导入问题。当重新导入主模块时,Python 的行为可能会令人困惑。尝试将代码放入单独模块中的函数中,导入并执行它。
0赞 Mikko Ohtamaa 11/9/2023
谢谢克劳斯。我开发 Python 已经 20 年了,所以我理解这一点。但是,我问如何使用 Jupyter Notebook 执行此操作,而不是将代码放入单独的模块中。
2赞 Corralien 11/9/2023
我尝试过Jupyter-Lab。复制/粘贴您的代码(添加一些小的修改以便能够工作)并使用 .它工作得很好。futureproof.executors.ProcessPoolExecutor
0赞 Mikko Ohtamaa 11/9/2023
谢谢@Corralien - 我在 Visual Studio Code 中运行,但我也尝试过其他 Jupyter 环境,例如直接从命令行。让我编辑问题并将其转换为可重复的示例。ipython
0赞 alec_djinn 11/14/2023
这不是一个更好的选择吗?multiprocessing.Pool().map()

答:

0赞 padu 11/14/2023 #1

您可以使用并行杆库轻松实现所需的行为。 例:

#pip install parallelbar
from parallelbar import progress_map

# your process function here
def foo(n):
#we simulate the occurrence of errors
    if n==5 or n==17:
        1/0
    elif n==10:
#we simulate timeout exceeding
        time.sleep(1)
    else:
        time.sleep(.1)
    return n

res = progress_map(foo, range(20), process_timeout=0.5, error_behavior='coerce', return_failed_tasks=True)
print(f'Result: {res[0]}')
print(f'Failed tasks: {res[1]}')

外:

Result: [0, 1, 2, 3, 4, ZeroDivisionError('division by zero'), 6, 7, 8, 9, 'function foo took longer than 0.5 s.', 
    11, 12, 13, 14, 15, 16, ZeroDivisionError('division by zero'), 18, 19]
Failed tasks: [5, 10, 17]

这适用于任何操作系统和 jupyter 笔记本 没错,操作系统 Windows 有一个例外。为此,您需要从模块中导入函数,即首先将其保存在某个模块中jupyter notebookfoo

评论

0赞 Mikko Ohtamaa 11/26/2023
我的问题是如何在不编写单独的 Python 模块的情况下做到这一点。这在 Jupyter 笔记本中不起作用,已将问题报告给库作者
0赞 Dunes 11/26/2023 #2

您似乎正在使用 macOS,而您遇到的问题是因为缺乏对 macOS 中分叉进程的完全支持。因此,在 macOS 上使用 spawn 方法启动子进程。以下段落描述了出现此问题的原因。简单的解决方案是在可由笔记本和工作进程导入的模块中定义函数。或者,跳到底部,了解使用 cloudpickle 的解决方法。multiprocessing

当您的进程(在 Linux 上启动多处理工作进程的默认方法)时,您将获得父进程内存的副本。这意味着工作进程可以访问和调用模块在分叉时在模块中定义的任何函数。但是,使用该方法创建的工作进程从一张白纸开始。因此,他们必须能够通过引用找到函数。这意味着导入函数的源模块,并在模块中按名称查找它。如果在模块中定义了一个函数,那么它必须是可导入的,并且多处理是预期的。fork__main__spawn__main____main__

当您启动 Jupyter 笔记本时,它会启动一个新内核。该内核是基于 REPL 的,而不是基于源/文件的。因此,该模块将是内核的 REPL 的模块,而不是您插入到笔记本单元格中的代码。__main__

就目前而言,没有办法强制多处理能够拾取 macOS(或 Windows)上 REPL 中定义的源。但是,有一种可能性。如果我们改变 python pickles 函数的方式,那么我们可以将函数发送到工作进程。CloudPickle 是一个第三方库,它对函数进行整体腌制,而不是通过引用。因此。你可以 monkey-patch 使用reducer_override,这样多处理将使用 cloudpickle 而不是 pickle 来 pickle 函数。cloudpicklemultiprocessing.reduction.ForkingPickler

import sys
from multiprocessing import Pool
from multiprocessing.reduction import ForkingPickler
from types import FunctionType
import cloudpickle

assert sys.version_info >= (3, 8), 'python3.8 or greater required to use reducer_override'

def reducer_override(obj):
    if type(obj) is FunctionType:
        return (cloudpickle.loads, (cloudpickle.dumps(obj),))
    else:
        return NotImplemented

# Monkeypatch our function reducer into the pickler for multiprocessing.
# Without this line, the main block will not work on windows or macOS.
# Alterntively, moving the defintionn of foo outside of the if statement
# would make the main block work on windows or macOS (when run from
# the command line).
ForkingPickler.reducer_override = staticmethod(reducer_override)

if __name__ == '__main__':
    def foo(x, y):
        return x * y
    
    with Pool() as pool:
        res = pool.apply(foo, (10, 3))

    print(res)
    assert res == 30

评论

0赞 Dunes 11/26/2023
旁注,我只能在 Windows 上测试它,因为我无法访问 macOS。但是 Windows 也存在与 macOS 相同的问题——使用 spawn 启动工作进程。