如何更改多处理模块使用的序列化方法?

How to change the serialization method used by the multiprocessing module?

提问人:ws_e_c421 提问时间:7/15/2017 最后编辑:Alonmews_e_c421 更新时间:7/5/2021 访问量:5911

问:

如何更改 Python 库使用的序列化方法?具体而言,默认序列化方法使用具有该版本 Python 的默认 pickle 协议版本的库。默认的 pickle 协议是 Python 2.7 中的版本 2 和 Python 3.6 中的版本 3。如何在 Python 3.6 中将协议版本设置为 2,以便我可以使用库中的一些类(如 和 )在 Python 2.7 运行的服务器处理和 Python 3.6 运行的客户端进程之间进行通信?multiprocessingpickleClientListenermultiprocessing

(旁注:作为测试,我修改了multiprocessing/connection.py的206行,通过添加调用以强制协议版本为2,并且我的客户端/服务器进程在我的有限测试中工作,服务器由2.7运行,客户端由3.6运行)。protocol=2dump()

在 Python 3.6 中,合并了一个补丁以设置序列化器,但该补丁没有文档,我还没有弄清楚如何使用它。以下是我尝试使用它的方式(我也将其发布到我链接到的 Python 票证中):

pickle2reducer.py:

from multiprocessing.reduction import ForkingPickler, AbstractReducer

class ForkingPickler2(ForkingPickler):
    def __init__(self, *args):
        if len(args) > 1:
            args[1] = 2
        else:
            args.append(2)
        super().__init__(*args)

    @classmethod
    def dumps(cls, obj, protocol=2):
        return ForkingPickler.dumps(obj, protocol)


def dump(obj, file, protocol=2):
    ForkingPickler2(file, protocol).dump(obj)


class Pickle2Reducer(AbstractReducer):
    ForkingPickler = ForkingPickler2
    register = ForkingPickler2.register
    dump = dump

在我的客户中:

import pickle2reducer
multiprocessing.reducer = pickle2reducer.Pickle2Reducer()

在顶部,然后再对 .当我这样做时,我仍然在 Python 2.7 运行的服务器上看到。multiprocessingValueError: unsupported pickle protocol: 3

python 序列化 pickle python-multiprocessing

评论

1赞 Alonme 12/18/2019
该问题有一个待处理的 PR (bugs.python.org/issue28053) 以添加文档和改进。github.com/python/cpython/pull/15058/files

答:

6赞 Daniel Liu 7/26/2017 #1

我相信如果您使用多处理“上下文”对象,您所指的补丁可以工作。

使用您的 pickle2reducer.py,您的客户应该从以下方面开始:

import pickle2reducer
import multiprocessing as mp

ctx = mp.get_context()
ctx.reducer = pickle2reducer.Pickle2Reducer()

并且具有与 相同的 API。ctxmultiprocessing

希望对您有所帮助!

评论

0赞 ws_e_c421 7/29/2017
这似乎是使用补丁的正确方法。我不明白如何在我的特定情况下使用上下文。我使用 和 这四个类中没有一个可以从上下文对象访问。我能够做到这一点:.from multiprocessing.connection import Client, Listenerfrom multiprocessing.managers import BaseManager, NameSpaceProxymultiprocessing.context._default_context.reducer = Pickle2Reducer()
2赞 Jase Lindgren 7/3/2021 #2

非常感谢。它完全引导我找到了我需要的解决方案。我最终做了类似的事情,但通过修改 Connection 类。对我来说,这比制作自己的完整子类并替换它更干净。

from multiprocessing.connection import Connection, _ForkingPickler, Client, Listener

def send_py2(self, obj):
    self._check_closed()
    self._check_writable()
    self._send_bytes(_ForkingPickler.dumps(obj, protocol=2))

Connection.send = send_py2

这正是 multiprocessing.connection 中的代码,仅添加了参数。protocol=2

我想您甚至可以通过直接编辑 multiprocessing.reduction 中的原始类来做同样的事情。ForkingPickler

评论

0赞 Jonathan1609 7/4/2021
函数的第三行有语法错误,你忘记了一个右括号self._send_bytes
0赞 Jase Lindgren 7/5/2021
谢谢!当您不小心复制和粘贴时,就会发生这种情况。刚刚更新了它。