在 python3 中苦苦挣扎

Struggling with multiprocessing in python3

提问人:Mika 提问时间:7/31/2023 最后编辑:Mika 更新时间:7/31/2023 访问量:61

问:

我正在尝试启动接收器进程。这是一个类,它只从套接字接收数据。例如,它只会在 csv 文件中写入行。 我将通过管道发送退出信号。在此信号之后,单独进程中的 while 循环将结束,并且该单独的写入器进程将完成。

import multiprocessing
from datetime import datetime
import time
import datetime
import csv
from multiprocessing import Process, Pipe


class CSVWriter:

    def __init__(self, pipe: multiprocessing.Pipe):
        self.pipe = pipe

    def write(self):
        with open(f'RTD.csv', 'w') as csvfile:
            writer = csv.DictWriter(csvfile, delimiter=',', fieldnames=['date'])

            while self.pipe.recv() != 'exit':
                writer.writerow({'date': (datetime.datetime.now())})
                time.sleep(1)
            print('exit csv')


pipe_receiver, pipe_sender = Pipe()


def write_csv(pipe):
    csv_writer = CSVWriter(pipe=pipe)
    csv_writer.write()


process = Process(target=write_csv, args=(pipe_receiver,))
process.start()

pipe_sender.send(input())
process.join()
process.close()

但是,与我的期望不同,它不会创建文件。或者,如果我输入了错误的退出键,它会创建空文件。

我在管道上挣扎了大约 2 天,无法理解问题所在。我将不胜感激任何信息。

PS 我正在使用进程 bc,我需要非常快速地从套接字接收数据(例如,我只是将数据写入文件)。所以,我不能使用线程。

python-3.x 多处理 python-multiprocessing

评论

0赞 ShadowRanger 7/31/2023
只是为了检查一下,您使用的是非 Windows、非 Mac 系统,对吗?如果没有导入保护,在模式下操作时会做可怕的事情(Windows 和 Mac 上的默认设置,以及 Windows 上的唯一选项)。仔细阅读指南;如果你忽略它们,有很多东西可能会出错。multiprocessing'spawn'
1赞 ShadowRanger 7/31/2023
旁注:你说“我正在使用进程 bc,我需要非常快速地从套接字接收数据(例如,我只是将数据写入文件)。所以,我不能使用线程“,但除非你省略了一些沉重的 CPU 使用率,否则这看起来像是一个 I/O 绑定问题,而线程可以很好地处理这些问题。
0赞 Mika 7/31/2023
@ShadowRanger,我需要尽快收到。频率最高可达 500 Hz。因此,即使接收清晰(未使用任何不同的线程),我也只有 300 Hz。
1赞 quamrana 7/31/2023
如果我将所有全局变量放在 Windows 中并创建一个循环以需要许多 s 才能通过管道发送,直到 .if __name__ == '__main__':input()"exit"
0赞 Mika 7/31/2023
@i在 Ubuntu 上。我实际上明白我需要永久发送输入。那么,是否有可能避免这种 recv() 锁呢?我只需要 while 循环无休止地运行,直到我只发送一条退出消息

答:

0赞 HelpfulHelper 7/31/2023 #1

首先,客户端打开文件并输入 -语句,然后将文件刷新到磁盘并关闭。 在通过管道发送数据之前阻止执行,这仅在调用 时发生。将 的结果直接传递到管道中,其结果由 接收。当返回 'exit' 以外的内容时,将再次调用,但这一次,父进程将不再发送数据,因为 'pipe_sender.send(input())' 语句只执行了一次。因此,将无限期阻塞,从而阻止 -statement 退出。因此,文件的写入缓冲区永远不会刷新,并且它保持为空。此外,父进程随后会调用,这会导致死锁:withpipe.recv()pipe_sender.send()input()recv()input()recv()recv()withjoin()

父进程正在等待子进程退出,它无法这样做,因为它正忙于等待来自父进程的数据。recv()

子进程正在等待父进程使用 向其发送数据,它无法执行此操作,因为它正忙于等待子进程退出。recv()

当输入“exit”时,将完全跳过 while 循环,程序的其余部分按预期执行;在这种情况下,也会创建一个空文件。

最好的解决方案是使用线程(它们通常比进程创建得更快,并且更容易处理,因为您可以访问相同的变量和对象)或者(因为您调用 join 并等待进程),在同一个程序中完全完成所有操作

评论

0赞 Mika 7/31/2023
我不确定线程是否能够处理 500hz 的接收流。但如果这是可能的,我';;使用 Threads ofc。此外,是否可以以某种方式解锁 recv()?没有收到任何数据。只是工作过程,直到退出键到达。
0赞 HelpfulHelper 7/31/2023
@Mika 我认为在纯 python 中“解锁”recv() 是不可能的。您可能需要使用一些特定于操作系统的 API,例如,在 Windows 上,子进程可以创建一个线程,该线程调用 Sleep,然后调用 CancelSynchronousIo。为了做到这一点,你必须看看ctypes库和你正在使用的任何操作系统的文档(高级)。我认为使用线程是更好的选择。我不明白为什么线程无法处理 500hz 的接收流。旁注:Python 是一种相对较慢的编程语言,在这样的速度下,我建议使用像 c++ 这样的编译语言。
0赞 Mika 8/1/2023
我有一个关于C的迷你项目,由单独的程序员编写。但是,与他沟通有点“困难”。所以,我不能真正关心我的项目的可操作性,使用他的程序,比如 C 库