如何使用 Python 的多处理和共享内存与 Array('c', fixed_length) 在进程之间传递字符串数据

How to pass string data between processes using Python's multiprocessing and shared memory with Array('c', fixed_length)

提问人:taichi 提问时间:11/17/2023 最后编辑:taichi 更新时间:11/17/2023 访问量:81

问:

我查看了过去关于使用多处理和共享内存共享 Python 字符串数据的问题,但似乎尚未解决。

我知道我可以使用这样的代码在进程之间共享数字和数组。

from multiprocessing import Value, Array, Process
import time

def process1(count,array):
    for i in range(5):
        time.sleep(0.3)
        count.value = count.value + 1
        array[count.value - 1] = count.value
        print("process1:"+str(count.value))
        
def process2(count,array):
    for i in range(5):
        time.sleep(0.7)
        count.value = count.value + 2
        array[count.value - 1] = count.value
        print("process2:"+str(count.value))

if __name__ == '__main__':
    count = Value('i', 0)
    array = Array('i', 15)
    
    process_test1 = Process(target=process1, args=[count,array], daemon=True)
    process_test2 = Process(target=process2, args=[count,array], daemon=True)
    
    process_test1.start()
    process_test2.start()
    
    process_test1.join()
    process_test2.join()
    
    print(array[:])
    print("process ended")

但是,我不确定如何处理字符串数据。

对过去问题的回答中,有人写道,使用会很好。但是,这个问题没有得到正确的答案。Array('c', fixed_length)

我尝试了以下代码,但它不起作用。

from multiprocessing import Value, Array, Process
import time

def process1(count,array):
    for i in range(5):
        time.sleep(0.3)
        count.value = count.value + 1
        array[count.value - 1] = "string:"+str(count.value)
        print("process1:"+str(count.value))
        
def process2(count,array):
    for i in range(5):
        time.sleep(0.7)
        count.value = count.value + 2
        array[count.value - 1] = "string:"+str(count.value)
        print("process2:"+str(count.value))

if __name__ == '__main__':
    count = Value('i', 0)
    array = Array('c', 15)
    
    process_test1 = Process(target=process1, args=[count,array], daemon=True)
    process_test2 = Process(target=process2, args=[count,array], daemon=True)
    
    process_test1.start()
    process_test2.start()
    
    process_test1.join()
    process_test2.join()
    
    print(array[:])
    print("process ended")
Python 多处理

评论


答:

1赞 Ahmed AEK 11/17/2023 #1

在 Python 中是对象而不是字符串,Python 字符串是编码的,要将它们放在您需要的 S 中,并将它们读回您需要的 Python。C stringsbytesutf-8C arrayencode("utf-8")decode("utf-8")

你的代码有一个主要的并发问题,你没有保护你的并发访问,两个进程读取它的值可能会得到相同的值,并且进程只会在数组中相互覆盖,所以你在这里需要一个锁。count

最后,如果你要写超过1个字符,你需要单独跟踪一个原子,你需要在每个字符串的末尾有一个,以便能够分辨一个字符串何时结束,另一个字符串何时开始(为了简单起见,我选择)。indexterminator,

from multiprocessing import Value, Array, Process, Lock
import time

def process1(count, array,  index, lock):
    for i in range(5):
        time.sleep(0.3)
        with lock:
            buffer_data = str("z" + str(count.value)).encode("utf-8")  # any str
            array[index.value:index.value + len(buffer_data)] = buffer_data[:]
            array[index.value + len(buffer_data)] = b','
            count.value += 1
            index.value += len(buffer_data) + 1
            print("process1:" + str(count.value))
        
def process2(count ,array, index, lock):
    for i in range(5):
        time.sleep(0.7)
        with lock:
            buffer_data = str("z" + str(count.value)).encode("utf-8")  # any str
            array[index.value:index.value + len(buffer_data)] = buffer_data[:]
            array[index.value + len(buffer_data)] = b','
            count.value += 1
            index.value += len(buffer_data) + 1
            print("process2:"+str(count.value))

if __name__ == '__main__':
    count = Value('i', 0)
    index = Value('i', 0)
    array = Array('c', 35)
    lock = Lock()

    process_test1 = Process(target=process1, args=[count,array, index, lock], daemon=True)
    process_test2 = Process(target=process2, args=[count,array, index, lock], daemon=True)
    
    process_test1.start()
    process_test2.start()
    
    process_test1.join()
    process_test2.join()
    
    print(array[:index.value].decode("utf-8"))
    print(array[:])
    print("process ended")
z0,z1,z2,z3,z4,z5,z6,z7,z8,z9,
b'z0,z1,z2,z3,z4,z5,z6,z7,z8,z9,\x00\x00\x00\x00\x00'

这里的锁同时保护 和 ,虽然操作是原子的,但您多次读取和修改每个操作,并且您不能保证所有读取和写入都会发生,而不会在没有该锁的情况下更改变量,具有类似的锁以防止并发写入(我更愿意为此使用 over)countindex+=multiprocessing.Queuemultiprocessing.Queuemultiprocessing.Array