对子进程的非阻塞读取。Python 中的 PIPE

A non-blocking read on a subprocess.PIPE in Python

提问人:Mathieu Pagé 提问时间:12/18/2008 最后编辑:Peter MortensenMathieu Pagé 更新时间:10/23/2023 访问量:314051

问:

我正在使用子进程模块启动子进程并连接到其输出流(标准输出)。我希望能够在其标准输出上执行非阻塞读取。有没有办法使 .readline 不阻塞或在调用之前检查流上是否有数据?我希望它是可移植的,或者至少可以在 Windows 和 Linux 下工作。.readline

这是我现在的做法(如果没有可用的数据,它会阻止):.readline

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Python IO 子进程 无阻塞

评论

20赞 Nasser Al-Wohaibi 5/7/2014
(来自谷歌?当其中一个 PIPE 的缓冲区被填满且未读取时,所有 PIPE 都将死锁。例如,当 stderr 被填充时,stdout 死锁。永远不要通过你不打算阅读的 PIPE。
1赞 Charlie Parker 2/28/2019
@NasserAl-Wohaibi 这是否意味着最好始终创建文件?
1赞 Charlie Parker 3/2/2019
我一直很想知道为什么它首先会被阻止......我问是因为我看到评论:To avoid deadlocks: careful to: add \n to output, flush output, use readline() rather than read()
1赞 Mathieu Pagé 3/2/2019
它是“设计”,等待接收输入。
12赞 Stuart Axon 5/8/2021
令人难以置信的是,这 12 年不是 python 本身的一部分:(

答:

7赞 monkut 12/18/2008 #1

一种解决方案是创建另一个进程来执行对进程的读取,或者创建具有超时的进程线程。

下面是超时函数的线程版本:

http://code.activestate.com/recipes/473878/

但是,您是否需要在 stdout 进来时阅读它? 另一种解决方案可能是将输出转储到文件中,并使用 p.wait() 等待该过程完成。

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

评论

0赞 n611x007 9/9/2013
似乎 recpie 的线程在超时后不会退出,杀死它取决于是否能够杀死它读取的子进程(SG.在这方面不相关)(你应该能够做到,但以防万一你不能......
2赞 S.Lott 12/18/2008 #2

select 模块可帮助您确定下一个有用输入的位置。

但是,您几乎总是对单独的线程更满意。一个执行阻塞读取 stdin,另一个执行您不想被阻塞的任何地方。

评论

11赞 ThomasH 7/15/2009
我认为这个答案没有帮助,原因有两个:(a) select 模块无法在 Windows 下的管道上运行(正如提供的链接明确指出的那样),这违背了 OP 拥有可移植解决方案的意图。(b) 异步线程不允许在父进程和子进程之间进行同步对话。如果父进程想要根据从子进程读取的下一行调度下一个操作,该怎么办?!
4赞 Helmut Grohne 1/27/2011
select 也没有用,因为即使在 select 之后,Python 的读取也会阻塞,因为它没有标准的 C 语义,也不会返回部分数据。
0赞 Emiliano 2/18/2011
一个单独的用于从孩子的输出中读取的脱粒机解决了我的问题,这与此类似。如果你需要同步交互,我想你不能使用这个解决方案(除非你知道会发生什么输出)。我会接受这个答案
0赞 dlukes 6/7/2023
“即使在选择之后,Python 的读取也会被阻止,因为它没有标准的 C 语义,也不会返回部分数据” → 如果您使用 os.read,则不会,例如子进程模块(出于这个原因)。
19赞 Noah 1/13/2009 #3

尝试 asyncproc 模块。例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

该模块按照 S.Lott 的建议处理所有线程。

评论

1赞 Cerin 12/2/2010
绝对给力。比原始子流程模块容易得多。在 Ubuntu 上非常适合我。
14赞 Bryan Oakley 1/11/2011
AsyncProc 在 Windows 上不起作用,Windows 不支持操作系统。WNOHANG :-(
30赞 Bryan Oakley 2/17/2011
asyncproc 是 GPL,这进一步限制了它的使用:-(
0赞 benjaoming 11/11/2012
谢谢。一件小事:似乎用 8 个空格替换 asyncproc.py 中的选项卡是要:)
0赞 grayaii 10/27/2015
看起来您无法通过 asyncproc 模块获取启动的进程的返回代码;仅它生成的输出。
90赞 Jesse 11/28/2009 #4

我经常遇到类似的问题;我经常编写的 Python 程序需要能够执行一些主要功能,同时接受来自命令行 (stdin) 的用户输入。简单地将用户输入处理功能放在另一个线程中并不能解决问题,因为会阻塞并且没有超时。如果主要功能已完成并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但它不能,因为它仍然阻塞在另一个线程中等待一行。我发现这个问题的一个解决方案是使用 fcntl 模块使 stdin 成为非阻塞文件:readline()readline()

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

在我看来,这比使用 select 或 signal 模块来解决这个问题要干净一些,但话又说回来,它仅适用于 UNIX......

评论

1赞 Denilson Sá Maia 4/28/2010
根据文档,fcntl() 可以接收文件描述符或具有 .fileno() 方法的对象。
2赞 Catalin Iacob 10/27/2010
在 Python 2 中使用 readline 似乎不正确。查看 anonnn's answer stackoverflow.com/questions/375427/...
12赞 Ivo Danihelka 2/14/2011
请不要使用繁忙循环。使用带有超时的 poll() 来等待数据。
12赞 anonnn 10/27/2010
杰西的回答是不正确的。根据 Guido 的说法,readline 在非阻塞模式下无法正常工作,并且在 Python 3000 之前不会。bugs.python.org/issue1175#msg56041如果要使用 fcntl 将文件设置为非阻塞模式,则必须使用较低级别的 os.read() 并自己分离出行。将 fcntl 与执行线路缓冲的高级调用混合使用会带来麻烦。
0赞 cat 2/22/2016
@Stefano什么是定义?buffer_size
13赞 Andy Jackson 1/26/2011 #5

使用 select & read(1)。

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

对于类似 readline() 的:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

评论

7赞 n611x007 9/9/2013
白搭。 根据 Docs 的说法,不应该在带有文件描述符的 Windows 上工作select
2赞 wvxvw 7/3/2019
我的天啊。读取兆字节,或者一次读取一个字符的千兆字节......这是很长一段时间以来我见过的最糟糕的主意......毋庸置疑,此代码不起作用,因为无论参数有多小,都是阻塞调用。proc.stdout.read()
0赞 nmz787 10/31/2019
OSError: [WinError 10093] Either the application has not called WSAStartup, or WSAStartup failed
492赞 jfs 2/4/2011 #6

在这种情况下,fcntlselectasyncproc 将无济于事。

无论使用何种操作系统,在不阻塞的情况下读取流的可靠方法是使用 Queue.get_nowait()

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

评论

11赞 Aki 2/22/2012
是的,这对我有用,但我删除了很多。它包括良好做法,但并不总是必要的。Python 3.x 2.X 兼容和close_fds可以省略,它仍然可以工作。但要注意一切的作用,不要盲目复制它,即使它只是有效!(实际上,最简单的解决方案是像 Seb 一样使用线程并执行读行,Qeues 只是获取数据的一种简单方法,还有其他方法,线程就是答案!
3赞 Justin 4/10/2012
在线程内部,调用阻止线程和主线程,我必须等到 readline 返回,然后其他一切才能继续。有什么简单的方法可以解决这个问题吗?(我正在从我的进程中读取多行,这也是另一个正在执行 DB 和其他操作的.py文件)out.readline
3赞 jfs 4/16/2012
@Justin:“out.readline”不会阻塞主线程,而是在另一个线程中执行。
4赞 n611x007 9/9/2013
如果我未能关闭子进程怎么办,例如。由于例外情况?stdout-reader 线程不会死,python 会挂起,即使主线程退出了,不是吗?如何解决这个问题?Python 2.x 不支持杀死线程,更糟糕的是,不支持中断它们。:((显然,应该处理异常以确保子进程被关闭,但以防万一它不会,你能做什么?
4赞 edA-qa mort-ora-y 10/31/2013
我在包中创建了一些友好的包装器 pypi.python.org/pypi/shelljobshelljob
6赞 Sebastien Claeys 4/22/2011 #7

我添加这个问题是为了读取一些子过程。Popen stdout. 这是我的非阻塞读取解决方案:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

评论

5赞 n611x007 9/9/2013
根据文档,FCNTL 在 Windows 上不起作用。
0赞 cat 2/22/2016
@anatolytechtonik改用msvcrt.kbhit()
17赞 Bryan Ward 4/22/2011 #8

Twisted 中,你可以很容易地做到这一点。根据你现有的代码库,这可能不那么容易使用,但如果你正在构建一个扭曲的应用程序,那么这样的事情就变得几乎微不足道了。创建一个类,并重写该方法。Twisted(取决于所使用的反应器)通常只是一个大循环,安装了回调来处理来自不同文件描述符(通常是网络套接字)的数据。因此,该方法只是安装一个回调来处理来自 的数据。演示此行为的简单示例如下所示:ProcessProtocoloutReceived()select()outReceived()STDOUT

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Twisted 文档提供了一些关于这方面的好信息。

如果你围绕 Twisted 构建整个应用程序,它会使与其他进程(本地或远程)进行异步通信,就像这样非常优雅。另一方面,如果你的程序不是建立在 Twisted 之上的,那么这不会有那么大的帮助。希望这对其他读者有所帮助,即使它不适用于您的特定应用程序。

评论

0赞 n611x007 9/9/2013
白搭。 根据 Docs 的说法,不应该在带有文件描述符的 Windows 上工作select
2赞 notbad.jpeg 9/30/2013
@naxa我不认为他所指的和你是同一个人。我假设这是因为在 Windows 上工作......select()Twisted
1赞 clacke 4/19/2016
“Twisted(取决于使用的反应器)通常只是一个大的 select() 循环”意味着有几个反应器可供选择。一个是 unix 和类 unix 上最便携的,但也有两个可用于 Windows 的反应器:twistedmatrix.com/documents/current/core/howto/......select()
7赞 Vukasin Toroman 7/6/2012 #9

免责声明:这仅适用于龙卷风

为此,您可以将 fd 设置为非阻塞,然后使用 ioloop 注册回调。我已将其打包在一个名为 tornado_subprocess 的 egg 中,您可以通过 PyPI 安装它:

easy_install tornado_subprocess

现在你可以做这样的事情:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

也可以将它与 RequestHandler 一起使用

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

评论

0赞 VisioN 11/27/2012
谢谢你的好功能!澄清一下,为什么我们不能简单地用于创建新的非阻塞进程?我在 Tornado websocket 实例中使用了它,它做得很好。threading.Threadon_message
1赞 Vukasin Toroman 12/2/2012
在 Tornado 中,大多不鼓励穿线。它们适用于小型、短时间运行的功能。你可以在这里阅读它:stackoverflow.com/questions/7846323/tornado-web-and-threads github.com/facebook/tornado/wiki/Threading-and-concurrency
0赞 James Gentes 5/24/2013
@VukasinToroman你真的用这个救了我。非常感谢您提供tornado_subprocess模块:)
0赞 n611x007 9/9/2013
这在 Windows 上有效吗?(请注意,使用文件描述符时,不会select)
0赞 Vukasin Toroman 9/10/2013
此库不使用调用。我没有在 Windows 下尝试过,但您可能会遇到麻烦,因为库正在使用该模块。简而言之:不,这可能在 Windows 下不起作用。selectfcntl
7赞 Vikram Pudi 3/15/2013 #10

现有的解决方案对我不起作用(详情如下)。最终有效的是使用 read(1) 实现 readline(基于这个答案)。后者不会阻止:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(myprocess,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

为什么现有的解决方案不起作用:

  1. 需要 readline 的解决方案(包括基于队列的解决方案)总是阻塞。杀死执行readline的线程是困难的(不可能的?只有当创建它的进程完成时,它才会被终止,但当输出生成进程被终止时,它不会被终止。
  2. 正如 anonnn 所指出的那样,将低级 fcntl 与高级读行调用混合在一起可能无法正常工作。
  3. 使用 select.poll() 很简洁,但根据 python 文档在 Windows 上不起作用。
  4. 对于这项任务来说,使用第三方库似乎有点矫枉过正,并增加了额外的依赖项。

评论

1赞 jfs 4/3/2013
1. 我的回答中的q.get_nowait()绝对不能阻止,永远,这就是使用它的意义所在。2. 执行 readline(enqueue_output() 函数)的线程在 EOF 上退出,例如,包括输出生成进程被终止的情况。如果你认为事实并非如此;请提供一个完整的最小代码示例,以其他方式显示(可能作为一个新问题)。
1赞 Vikram Pudi 4/5/2013
@sebastian我花了一个小时或更长时间试图想出一个最小的例子。最后,我必须同意你的回答可以处理所有情况。我想它之前对我不起作用,因为当我试图杀死输出生成过程时,它已经被杀死了并给出了一个难以调试的错误。这个小时花得很值,因为在想出一个最小的例子的同时,我可以想出一个更简单的解决方案。
0赞 n611x007 9/9/2013
您能发布更简单的解决方案吗?:)(如果它与塞巴斯蒂安的不一样)
0赞 ViFI 11/19/2016
@danger89:我想。dcmpid = myprocess
0赞 sergzach 3/6/2019
在 read() 调用后的条件下(就在 while True 之后):out 永远不会是空字符串,因为您至少读取长度为 1 的字符串/字节。
0赞 Cenk Alti 4/28/2013 #11

我根据 J. F. Sebastian 的解决方案创建了一个库。你可以使用它。

https://github.com/cenkalti/what

0赞 romc 5/8/2013 #12

编辑:此实现仍然阻塞。请改用 J.F.Sebastian 的答案

我尝试了最佳答案,但线程代码的额外风险和维护令人担忧。

浏览 io 模块(仅限于 2.6),我找到了 BufferedReader。这是我的无线程、无阻塞解决方案。

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

评论

0赞 jfs 11/10/2013
你试过吗?它是无线程(单线程)的,当您的代码阻塞时会阻塞。for line in iter(p.stdout.readline, ""): # do stuff with the line
0赞 romc 11/28/2013
@j-f-塞巴斯蒂安:是的,我最终回到了你的答案。我的实现仍然偶尔被阻止。我会编辑我的答案,警告其他人不要走这条路。
8赞 datacompboy 6/12/2013 #13

这是我的代码,用于尽快捕获子进程的每个输出,包括部分行。它同时泵送,并且 stdout 和 stderr 几乎以正确的顺序进行。

在Python 2.7 linux和windows上测试并正确工作。

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

评论

0赞 totaam 11/26/2014
为数不多的答案之一,可以让你阅读不一定以换行符结尾的东西。
0赞 Orsiris de Jong 8/30/2021
虽然您的解决方案是我最接近没有丢失输入的解决方案,但使用上述代码连续运行“cat /some/big/file”之类的东西数百次,并将每个输出与最后一个输出进行比较,将显示差异并最终出现一些(罕见)无法捕获整个输出的时间。
0赞 datacompboy 11/11/2021
嗯。.不是整个文件 -- 因为开头缺少某些内容(即它在 io.open 之前发送了数据),或者因为文件末尾的某些内容(在耗尽所有输入之前退出)?
0赞 edA-qa mort-ora-y 10/31/2013 #14

根据 J.F. Sebastian 的回答和其他几个来源,我整理了一个简单的子流程管理器。它提供请求非阻塞读取,以及并行运行多个进程。它不使用任何特定于操作系统的调用(据我所知),因此应该在任何地方工作。

它可以从 pypi 获得,所以只需 .有关示例和完整文档,请参阅项目页面pip install shelljob

50赞 jfs 12/20/2013 #15

Python 3.4 为异步 IO 引入了新的临时 API——asyncio 模块

这种方法类似于 @Bryan Ward 的基于扭曲的答案——定义一个协议,一旦数据准备好,就会立即调用其方法:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

请参阅文档中的“子进程”。

有一个高级接口返回 Process 对象,允许使用 StreamReader.readline() 协程(使用 async/await Python 3.5+ 语法)异步读取一行:asyncio.create_subprocess_exec()

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill()执行以下任务:

  • 启动子进程,将其 stdout 重定向到管道
  • 异步读取 Subprocess 的 stdout 中的一行
  • kill 子进程
  • 等待它退出

如有必要,每个步骤都可以限制超时秒数。

评论

1赞 flutefreak7 1/15/2016
当我使用 python 3.4 协程尝试这样的事情时,我只有在运行整个脚本后才会得到输出。我希望在子进程打印一行时立即看到一行输出被打印出来。这是我得到的:pastebin.com/qPssFGep
1赞 jfs 1/15/2016
@flutefreak7:缓冲问题与当前问题无关。点击链接了解可能的解决方案。
0赞 flutefreak7 1/15/2016
谢谢!通过简单地使用解决了我的脚本的问题,以便打印的文本可以立即提供给调用的观察者。当我使用基于 Fortran 的可执行文件测试它时,我实际上想要包装/监视它,它不会缓冲它的输出,所以它的行为符合预期。print(text, flush=True)readline
0赞 Carel 5/11/2017
是否可以允许子进程持久化并执行进一步的读/写操作。,在第二个脚本中,其工作方式非常相似,因为它在一次读/写操作后终止进程。我还看到您使用的是单个管道,子进程将其处理为非阻塞。尝试同时使用两者,我发现我最终阻止了readline_and_killsubprocess.comunicatestdoutstdoutstderr
0赞 jfs 5/11/2017
@Carel答案中的代码按预期工作,如答案中明确描述的那样。如果需要,可以实现其他行为。如果使用,两个管道同样是非阻塞的,下面是一个如何同时从两个管道读取的示例。
1赞 mfmain 1/19/2015 #16

为什么要打扰 thread&queue? 与 readline() 不同,BufferedReader.read1() 不会阻止等待 \r\n,如果有任何输出传入,它会尽快返回。

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

评论

1赞 Mathieu Pagé 1/26/2015
如果没有任何东西进来,它会尽快返回吗?如果没有,则表示阻塞。
0赞 Jack O'Connor 8/22/2015
@MathieuPagé是对的。 如果第一个基础读取阻塞,则会阻塞,当管道仍处于打开状态但没有可用的输入时会发生这种情况。read1
1赞 Dmytro 2/2/2015 #17

就我而言,我需要一个日志记录模块来捕获后台应用程序的输出并对其进行增强(添加时间戳、颜色等)。

我最终得到了一个执行实际 I/O 的后台线程。以下代码仅适用于 POSIX 平台。我剥离了非必要的部分。

如果有人打算长期使用这只野兽,请考虑管理开放描述符。就我而言,这不是一个大问题。

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)
2赞 ideasman42 1/28/2016 #18

在此处添加此答案,因为它提供了在 Windows 和 Unix 上设置非阻塞管道的功能。

所有细节都归功于@techtonik的回答ctypes

有一个略微修改的版本,可以在 Unix 和 Windows 系统上使用。

  • 兼容 Python3(只需稍作改动)。
  • 包括 posix 版本,并定义要用于任一版本的异常。

这样,您可以对 Unix 和 Windows 代码使用相同的函数和异常。

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

为了避免读取不完整的数据,我最终编写了自己的读行生成器(它返回每行的字节字符串)。

它是一个生成器,因此您可以例如...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

评论

0赞 jfs 1/29/2016
(1) 此注释表明不适用于 Python 2 上的非阻塞管道(例如 set using )——您认为它不再正确吗?(我的答案包含提供相同信息的链接(),但现在似乎已删除)。(2)看如何使用readline()fcntlfcntlmultiprocessing.connection.PipeSetNamedPipeHandleState
0赞 ideasman42 1/29/2016
我只在 Python3 上测试过。但是也看到了这些信息,并希望它仍然有效。我还编写了自己的代码来代替 readline,我已经更新了我的答案以包含它。
4赞 Tom Lime 5/24/2016 #19

此版本的非阻塞读取不需要特殊模块,可以在大多数 Linux 发行版上开箱即用。

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())
1赞 Paco 3/25/2017 #20

这是一个在子进程中运行交互式命令的示例,stdout 是使用伪终端进行交互的。您可以参考:https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
1赞 brookbot 6/28/2017 #21

我的问题有点不同,因为我想从正在运行的进程中同时收集 stdout 和 stderr,但最终是相同的,因为我想在小部件中呈现其生成的输出。

我不想求助于许多使用队列或其他线程的建议解决方法,因为它们不应该是执行运行另一个脚本并收集其输出等常见任务所必需的。

在阅读了建议的解决方案和 python 文档后,我解决了以下实现的问题。是的,它仅适用于 POSIX,因为我正在使用函数调用。select

我同意文档令人困惑,并且对于如此常见的脚本任务而言,实现很笨拙。我相信旧版本的 python 有不同的默认值和不同的解释,因此造成了很多混乱。这似乎适用于 Python 2.7.12 和 3.5.2。Popen

关键是设置行缓冲,然后作为文本文件而不是二进制文件进行处理,这在设置时似乎成为默认值。bufsize=1universal_newlines=Truebufsize=1

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR、DEBUG 和 VERBOSE 只是将输出输出打印到终端的宏。

恕我直言,该解决方案的有效率为 99.99%,因为它仍然使用阻塞功能,因此我们假设子进程很好并输出完整的行。readline

我欢迎反馈以改进解决方案,因为我仍然是 Python 的新手。

评论

0赞 Aaron 1/24/2018
在这种特殊情况下,您可以设置 stderr=subprocess。STDOUT,并从 cmd.stdout.readline() 获取所有输出。
0赞 Alcamtar 10/4/2019
很好的例子。select.select() 有问题,但这为我解决了它。
0赞 Bradley Odell 10/3/2017 #22

此解决方案使用该模块从 IO 流中“读取任何可用数据”。此函数最初会阻止数据,直到数据可用,但随后仅读取可用的数据,并且不会进一步阻止。select

鉴于它使用该模块,这仅适用于 Unix。select

该代码完全符合 PEP8 标准。

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer
0赞 gonzaedu61 4/7/2018 #23

我也遇到了 Jesse 描述的问题,并像 BradleyAndy 和其他人一样使用“select”解决了它,但处于阻塞模式以避免繁忙循环。它使用虚拟管道作为假 stdin。选择块并等待 stdin 或管道准备就绪。当按下一个键时,stdin 会解锁 select,并且可以使用 read(1) 检索键值。当不同的线程写入管道时,管道会解除对选择的阻塞,这可以被视为对 stdin 的需求已经结束的迹象。以下是一些参考代码:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

评论

0赞 gonzaedu61 4/7/2018
注意:为了在 Windows 中实现此工作,管道应替换为套接字。我还没有尝试过,但它应该根据文档工作。
3赞 Dave Kitchen 9/26/2018 #24

我有原始提问者的问题,但不想调用线程。我将 Jesse 的解决方案与直接来自管道的解决方案和我自己的用于行读取的缓冲区处理程序混合在一起(但是,我的子进程 - ping - 总是<系统页面大小写入整行)。我通过只在 gobject 注册的 io 手表中读取来避免繁忙的等待。这些天来,我通常在 gobject MainLoop 中运行代码以避免线程。read()

def set_up_ping(ip, w):
    # run the sub-process
    # watch the resultant pipe
    p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
    # make stdout a non-blocking file
    fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
    fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
    return stdout_gid # for shutting down

观察者是

def watch(f, *other):
    print 'reading',f.read()
    return True

主程序设置一个 ping,然后调用 gobject 邮件循环。

def main():
    set_up_ping('192.168.1.8', watch)
    # discard gid as unused here
    gobject.MainLoop().run()

任何其他工作都附加到 gobject 中的回调。

评论

0赞 Eric 8/28/2022
selector是 Python stdlib 的一部分,可以轻松替换 gobject。
14赞 user240515 5/9/2019 #25

在现代 Python 中,情况要好得多。

这是一个简单的子程序“hello.py”:

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

以及一个与之交互的程序:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

打印出来:

b'hello bob\n'
b'hello alice\n'

请注意,实际的模式,也是几乎所有前面的答案,无论是在这里还是在相关问题中,都是将子项的 stdout 文件描述符设置为非阻塞,然后在某种选择循环中轮询它。当然,现在这个循环是由 asyncio 提供的。

评论

3赞 nijave 8/10/2021
imo 这是最好的答案,它实际上在后台使用 Windows 重叠/异步读/写(而不是一些线程变体来处理阻塞)。根据文档,您应该调用以确保 write(..) 实际通过drain()
0赞 Daniel Donnelly 10/22/2022
我从中得到错误:我正在从 QThread (PyQt5) 中执行此操作builtins.ValueError: set_wakeup_fd only works in main thread
0赞 alexei 12/16/2022
在 Windows 上运行良好。解决 PowerShell 管道的限制,这些管道仅在源退出时传递输出,而不是逐块传递。
0赞 Shahboz 10/12/2023
您的解决方案在使用 Python 3.11 的 Mac 上对我有用。
0赞 betontalpfa 12/7/2019 #26

试试 wexpect,它是 pexpect 的 Windows 替代品。

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()
65赞 saaj 12/12/2019 #27

在类 Unix 系统和 Python 3.5+ 上,有 os.set_blocking 完全按照它所说的去做。

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

这将输出:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

评论是:os.set_blocking

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''

评论

7赞 Coronon 11/14/2020
这是迄今为止最优雅的解决方案,感谢您让我度过美好的一天(晚上实际上是^^)
4赞 gromain 11/16/2020
非常优雅,非常高效。感谢您提供此解决方案,它运行良好!
2赞 jlh 1/28/2021
谢谢!当使用带有 的管道时,这很有效,以确保它不会堵塞。PopenSelector
2赞 Berwyn 11/3/2022
优雅,是的。但它不是多平台的(根据问题)。
2赞 ChrisZZ 10/31/2023
Windows 用户必须将其 Python 更新到 3.12,Windows 上的较低版本不提供此功能。我的conda更新只给了我Python 3.11.5,太难过了。
7赞 Olivier Michel 3/20/2020 #28

这是一个基于线程的简单解决方案,它:

  • 适用于 Linux 和 Windows(不依赖 )。select
  • 异步读取 和 。stdoutstderr
  • 不依赖于具有任意等待时间的主动轮询(对 CPU 友好)。
  • 不使用(这可能与其他库冲突)。asyncio
  • 一直运行,直到子进程终止。

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()

评论

1赞 Aidan 7/9/2022
似乎效果很好。我注意到,如果使用 c++ exe 进行连接,我需要在任何 printfs 之后调用 fflush(stdout) 到 stdout 以使事情在 Windows 上运行。stderr 不需要。
1赞 Xeverous 10/4/2022
默认情况下,C++ 标准库流(stderr 除外)是缓冲的。如果您正在进行任何类型的交互(控制台或文件除外),则需要刷新以立即看到另一侧的效果。
4赞 Orsiris de Jong 9/8/2021 #29

不是第一个,也可能不是最后一个,我构建了一个包,它使用两种不同的方法进行非阻塞 stdout PIPE 读取,一种基于 J.F. Sebastian (@jfs) 的答案,另一种是一个简单的 communicate() 循环,带有一个线程来检查超时。

这两种 stdout 捕获方法都经过测试,可在 Linux 和 Windows 下工作,截至撰写本文时,Python 版本从 2.7 到 3.9 不等

由于是非阻塞的,它保证了超时的强制执行,即使有多个子进程和孙子进程,甚至在 Python 2.7 下也是如此。

该包还处理字节和文本标准输出编码,在尝试捕获 EOF 时是一场噩梦。

您可以在 https://github.com/netinvent/command_runner 找到包裹

如果你需要一些经过充分测试的非阻塞读取实现,请尝试一下(或破解代码):

pip install command_runner

from command_runner import command_runner

exit_code, output = command_runner('ping 127.0.0.1', timeout=3)
exit_code, output = command_runner('echo hello world, shell=True)
exit_code, output = command_runner('some command', stdout='some_file')

您可以在 or 根据所采用的捕获方法找到核心非阻塞读取代码。 从那里,你可以用自己的方式找到你想要的东西,或者只是使用整个包来执行你的命令作为子进程的替代品。_poll_process()_monitor_process()

评论

1赞 NeStack 8/19/2023
太棒了,这就是我需要的!非常感谢您制作这个包,并且它也是公开的!我花了几个小时寻找这样的解决方案
0赞 Orsiris de Jong 8/21/2023
感谢您的反馈,很高兴知道这些软件包何时可以帮助某人;)
0赞 Michael Wegter 9/26/2023 #30

我要告诉你,有一种超级简单的方法可以做到这一点。我找了很长一段时间,尝试了一些复杂的队列函数等,所有这些都偶然发现了这条评论,它解释了您只需要在 Popen 中指定stdout=sys.stdout

import subprocess
import sys

def execute(command):
    subprocess.Popen('myprogram.exe', shell=True, stdout=sys.stdout, stderr=subprocess.STDOUT)