提问人:Unnamed_Python_User 提问时间:12/12/2021 最后编辑:Unnamed_Python_User 更新时间:12/29/2021 访问量:203
如何将 python IO 流合并到单个迭代器中,但维护哪个项目来自哪个流?
How to merge python IO streams into a single iterator, but maintain which item comes from which stream?
问:
所需的功能如下:
import subprocess as sp
p = sp.Popen(('program', 'arg1', ...), stdout=sp.PIPE, stderr=sp.PIPE)
for line in merge_streams((p.stdout, 'OUT'), (p.stderr, 'ERR')):
print(line)
它应该实时输出如下内容:
('OUT', b'output line 1')
('OUT', b'output line 2')
('ERR', b'err line 1')
('ERR', b'err line 2')
('OUT', b'output line 3')
需要明确的是,从 CMD 运行相同的程序将输出:
output line 1
output line 2
err line 1
err line 2
output line 3
using 将合并流,但无法区分它们。 p = sp.Popen(('program', 'arg1', ...), stdout=sp.PIPE, stderr=sp.STDOUT)
显然,使用会把一个行的所有行放在另一个行的所有行之后。itertools.chain
我唯一的半工作解决方案涉及 2 个线程推送到 a 并从中读取主程序,
但这种方法似乎打乱了同时写入流的多行块的顺序。collections.dequeue
即,像这样的例外:
asdf : The term 'asdf' is not recognized as the name of a cmdlet, function, script file, or operable program. Check the spelling of the name, or if a path was included, verify that the path is correct and try again.
At something.ps1:2 char:5
+ asdf
+ ~~~~
+ CategoryInfo : ObjectNotFound: (asdf:String) [], CommandNotFoundException
+ FullyQualifiedErrorId : CommandNotFoundException
可能打印如下:
b' + CategoryInfo : ObjectNotFound: (asdf:String) [], CommandNotFoundException'
b' + FullyQualifiedErrorId : CommandNotFoundException'
b'At someting.ps1:2 char:5'
b'+ asdf'
b'+ ~~~~'
b"asdf : The term 'asdf' is not recognized as the name of a cmdlet, function, script file, or operable program. Check the spelling of the name, or if a path was included, verify that the
path is correct and try again."
为了避免 XY 问题:最终目标是使用 实时将输出发送给客户端,并且该客户端需要知道什么是 stderr,什么是 stdout。如果使用 WebSocket 使这更容易,那也没关系。fastapi.responses.StreamingResponse
答:
0赞
Itay Raveh
12/29/2021
#1
不确定您的线程解决方案出了什么问题,但这似乎运行良好:
import queue
import threading
import subprocess as sp
from typing import IO, Tuple, Any
def enqueue_io(f: IO, q: queue.Queue, prefix: Any)
for line in iter(f.readline, b''):
q.put((prefix, line))
def merge_streams(*streams: Tuple[Any, IO])
q = queue.Queue()
threads = [threading.Thread(target=enqueue_io, args=(f, q, prefix) for prefix, f in streams]
[t.start() for t in threads]
while any(map(threading.Thread.is_alive, threads)):
try:
yield q.get_nowait()
except queue.Empty:
pass
while q.qsize() > 0:
yield q.get_nowait()
[t.join() for t in threads]
with sp.Popen(...) as p:
for line in merge_streams(('OUT', p.stdout), ('ERR', p.stderr)):
print(line)
评论