如何将 python IO 流合并到单个迭代器中,但维护哪个项目来自哪个流?

How to merge python IO streams into a single iterator, but maintain which item comes from which stream?

提问人:Unnamed_Python_User 提问时间:12/12/2021 最后编辑:Unnamed_Python_User 更新时间:12/29/2021 访问量:203

问:

所需的功能如下:

import subprocess as sp

p = sp.Popen(('program', 'arg1', ...), stdout=sp.PIPE, stderr=sp.PIPE)

for line in merge_streams((p.stdout, 'OUT'), (p.stderr, 'ERR')):
    print(line)

它应该实时输出如下内容:

('OUT', b'output line 1')
('OUT', b'output line 2')
('ERR', b'err line 1')
('ERR', b'err line 2')
('OUT', b'output line 3')

需要明确的是,从 CMD 运行相同的程序将输出:

output line 1
output line 2
err line 1
err line 2
output line 3

using 将合并流,但无法区分它们。 p = sp.Popen(('program', 'arg1', ...), stdout=sp.PIPE, stderr=sp.STDOUT)

显然,使用会把一个行的所有行放在另一个行的所有行之后。itertools.chain

我唯一的半工作解决方案涉及 2 个线程推送到 a 并从中读取主程序, 但这种方法似乎打乱了同时写入流的多行块的顺序。collections.dequeue

即,像这样的例外:

asdf : The term 'asdf' is not recognized as the name of a cmdlet, function, script file, or operable program. Check the spelling of the name, or if a path was included, verify that the path is correct and try again.
At something.ps1:2 char:5
+     asdf
+     ~~~~
    + CategoryInfo          : ObjectNotFound: (asdf:String) [], CommandNotFoundException
    + FullyQualifiedErrorId : CommandNotFoundException

可能打印如下:

b'    + CategoryInfo          : ObjectNotFound: (asdf:String) [], CommandNotFoundException'
b'    + FullyQualifiedErrorId : CommandNotFoundException'
b'At someting.ps1:2 char:5'
b'+     asdf'
b'+     ~~~~'
b"asdf : The term 'asdf' is not recognized as the name of a cmdlet, function, script file, or operable program. Check the spelling of the name, or if a path was included, verify that the 
path is correct and try again."

为了避免 XY 问题:最终目标是使用 实时将输出发送给客户端,并且该客户端需要知道什么是 stderr,什么是 stdout。如果使用 WebSocket 使这更容易,那也没关系。fastapi.responses.StreamingResponse

python io python-3.8

评论


答:

0赞 Itay Raveh 12/29/2021 #1

不确定您的线程解决方案出了什么问题,但这似乎运行良好:

import queue
import threading
import subprocess as sp
from typing import IO, Tuple, Any

def enqueue_io(f: IO, q: queue.Queue, prefix: Any)
    for line in iter(f.readline, b''):
        q.put((prefix, line))

def merge_streams(*streams: Tuple[Any, IO])
    q = queue.Queue()
    threads = [threading.Thread(target=enqueue_io, args=(f, q, prefix) for prefix, f in streams]
    [t.start() for t in threads]

    while any(map(threading.Thread.is_alive, threads)):
       try:
            yield q.get_nowait()
       except queue.Empty:
            pass
    
    while q.qsize() > 0:
        yield q.get_nowait()

    [t.join() for t in threads]

with sp.Popen(...) as p:
    for line in merge_streams(('OUT', p.stdout), ('ERR', p.stderr)):
        print(line)

评论

0赞 Community 12/29/2021
正如目前所写的那样,你的答案尚不清楚。请编辑以添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。您可以在帮助中心找到有关如何写出好答案的更多信息。