提问人:karl 提问时间:8/16/2023 最后编辑:karl 更新时间:10/30/2023 访问量:169
读取进程 standardOutput 和 standardError 在 swift 中并行进行,无阻塞
read process standardOutput and standardError in parallel in swift without blocking
问:
在 swift5 中,我想在不阻塞的情况下运行读取,这样我就可以解析它们。Process()
standardOutput
standardError
此示例代码一旦调用了 with 行,程序执行就会被阻止。standardOutput 读取器停止打印for try await line in errorPipe.fileHandleForReading.bytes.lines
import Foundation
let outputPipe = Pipe()
let errorPipe = Pipe()
let process = Process()
process.executableURL = URL(fileURLWithPath:"/sbin/ping")
process.arguments = ["google.com"]
process.standardOutput = outputPipe
process.standardError = errorPipe
try? process.run()
func processStdOut() async
{
for i in 0..<5 {
print("processStdOut X ", i)
try? await Task.sleep(nanoseconds: 1_000_000_000)
}
do {
for try await line in outputPipe.fileHandleForReading.bytes.lines {
print("stdout Line: \(line)")
}
} catch {
NSLog("processStdOut Error \(error.localizedDescription)")
}
NSLog("processStdOut finished")
}
func processStdErr() async
{
for i in 0..<5 {
print("processStdErr X ", i)
try? await Task.sleep(nanoseconds: 2_000_000_000)
}
do {
for try await line in errorPipe.fileHandleForReading.bytes.lines {
print("stderr Line: \(line)")
}
} catch {
NSLog("processStdErr Error \(error.localizedDescription)")
}
NSLog("processStdErr finished")
}
await withTaskGroup(of: Void.self) { group in
group.addTask {
await processStdErr()
}
group.addTask {
await processStdOut()
}
group.addTask {
process.waitUntilExit()
}
}
请注意,如果通过断开 wifi 或网络连接来强制数据进入 standardError,standardOutput 将再次解锁。
我还有什么应该尝试的吗?
答:
2赞
Fernando Urbano
8/17/2023
#1
大多数程序默认使用默认缓冲策略,并且由于您无法控制如何处理输出,因此其中一个管道可能会阻止实现(不确定原因)。我通过调用来避免阻塞,让它同时与两个管道一起使用。/sbin/ping
FileHandle.AsyncBytes
.availableData
import Foundation
let outputPipe = Pipe()
let errorPipe = Pipe()
let process = Process()
process.executableURL = URL(fileURLWithPath: "/sbin/ping")
process.arguments = ["-c", "10", "diariosur.es"]
process.standardOutput = outputPipe
process.standardError = errorPipe
try? process.run()
func processStdOut() async {
print("stdout start")
while process.isRunning {
let data = outputPipe.fileHandleForReading.availableData
if !data.isEmpty {
if let line = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) {
print("stdout data: \(line)")
}
}
}
print("stdout finished")
}
func processStdErr() async {
print("stderr start")
while process.isRunning {
let data = errorPipe.fileHandleForReading.availableData
if !data.isEmpty {
if let line = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) {
print("stderr data: \(line)")
}
}
}
print("stderr finished")
}
await withTaskGroup(of: Void.self) { group in
group.addTask {
await processStdErr()
}
group.addTask {
await processStdOut()
}
}
process.waitUntilExit()
然后我得到以下(修剪)输出:
stderr start
stdout start
stdout data: PING diariosur.es (23.213.41.6): 56 data bytes
64 bytes from 23.213.41.6: icmp_seq=0 ttl=57 time=7.060 ms
stdout data: 64 bytes from 23.213.41.6: icmp_seq=1 ttl=57 time=6.562 ms
...
stdout data: 64 bytes from 23.213.41.6: icmp_seq=9 ttl=57 time=7.904 ms
stdout data: --- diariosur.es ping statistics ---
10 packets transmitted, 10 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 6.562/7.327/9.439/0.783 ms
stdout finished
stderr finished
我尝试过默认为,它也没有阻止:curl
stderr
process.executableURL = URL(fileURLWithPath: "/usr/bin/curl")
process.arguments = ["-N", "--output", "test.zsync", "http://ubuntu.mirror.digitalpacific.com.au/releases/23.04/ubuntu-23.04-desktop-amd64.iso.zsync"]
编辑:
使用以下 C 程序进行测试:
#include <stdio.h>
#include <unistd.h>
int main() {
for (int i = 1; i <= 100; ++i) {
fprintf(stdout, "stdout: %d\n", i);
fflush(stdout);
if (i % 10 == 0) {
fprintf(stderr, "stderr: %d\n", i);
fflush(stderr);
}
usleep(100000);
}
return 0;
}
它返回以下输出:
stdout start
stderr start
stdout data: stdout: 1
stdout data: stdout: 2
stdout data: stdout: 3
stdout data: stdout: 4
stdout data: stdout: 5
stdout data: stdout: 6
stdout data: stdout: 7
stdout data: stdout: 8
stdout data: stdout: 9
stderr data: stderr: 10
stdout data: stdout: 10
stdout data: stdout: 11
stdout data: stdout: 12
stdout data: stdout: 13
stdout data: stdout: 14
stdout data: stdout: 15
stdout data: stdout: 16
stdout data: stdout: 17
stdout data: stdout: 18
stdout data: stdout: 19
stderr data: stderr: 20
评论
1赞
karl
8/17/2023
通过删除和读取来工作。为了安全起见,必须实现自己的行解析器。FileHandle.AsyncBytes
lines
0赞
Rob
8/18/2023
FWIW,我的经验表明该问题与缓冲区的冲洗无关。只处理,我得到它,因为它是输出的。do and 并且您会得到这种阻塞行为。但是使用你的方法或,问题就消失了。bytes
standardOutput
standardOutput
standardError
bytes
readabilityHandler
4赞
Rob
8/21/2023
#2
是的,在同时使用 on 时,标准实现似乎也会阻塞。bytes
standardOutput
bytes
standardError
这是一个不会阻塞的简单实现,因为它利用了:bytes
readabilityHandler
extension Pipe {
struct AsyncBytes: AsyncSequence {
typealias Element = UInt8
let pipe: Pipe
func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
AsyncStream { continuation in
pipe.fileHandleForReading.readabilityHandler = { @Sendable handle in
let data = handle.availableData
guard !data.isEmpty else {
continuation.finish()
return
}
for byte in data {
continuation.yield(byte)
}
}
continuation.onTermination = { _ in
pipe.fileHandleForReading.readabilityHandler = nil
}
}.makeAsyncIterator()
}
}
var bytes: AsyncBytes { AsyncBytes(pipe: self) }
}
因此,在同时处理两者时,以下人员不会遇到相同的问题:standardOutput
standardError
let outputPipe = Pipe()
let errorPipe = Pipe()
let process = Process()
process.executableURL = URL(fileURLWithPath: …)
process.standardOutput = outputPipe
process.standardError = errorPipe
func processStandardOutput() async throws {
for try await line in outputPipe.bytes.lines {
…
}
}
func processStandardError() async throws {
for try await line in errorPipe.bytes.lines {
…
}
}
// optionally, you might want to return whatever non-zero termination status code the process returned
process.terminationHandler = { process in
if process.terminationStatus != 0 {
exit(process.terminationStatus)
}
}
try process.run()
try? await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await processStandardOutput()
}
group.addTask {
try await processStandardError()
}
…
}
评论