读取进程 standardOutput 和 standardError 在 swift 中并行进行,无阻塞

read process standardOutput and standardError in parallel in swift without blocking

提问人:karl 提问时间:8/16/2023 最后编辑:karl 更新时间:10/30/2023 访问量:169

问:

在 swift5 中,我想在不阻塞的情况下运行读取,这样我就可以解析它们。Process()standardOutputstandardError

此示例代码一旦调用了 with 行,程序执行就会被阻止。standardOutput 读取器停止打印for try await line in errorPipe.fileHandleForReading.bytes.lines


import Foundation

let outputPipe = Pipe()
let errorPipe = Pipe()

let process = Process()
process.executableURL = URL(fileURLWithPath:"/sbin/ping")
process.arguments = ["google.com"]
process.standardOutput = outputPipe
process.standardError = errorPipe

try? process.run()

func processStdOut() async
{
  for i in 0..<5 {
    print("processStdOut X ", i)
    try? await Task.sleep(nanoseconds: 1_000_000_000)
  }

  do {
    for try await line in outputPipe.fileHandleForReading.bytes.lines {
      print("stdout Line: \(line)")
    }
  } catch {
    NSLog("processStdOut Error \(error.localizedDescription)")
  }
  NSLog("processStdOut finished")

}

func processStdErr() async
{
  for i in 0..<5 {
    print("processStdErr X ", i)
    try? await Task.sleep(nanoseconds: 2_000_000_000)
  }
  do {
    for try await line in errorPipe.fileHandleForReading.bytes.lines {
      print("stderr Line: \(line)")
    }
  } catch {
    NSLog("processStdErr Error \(error.localizedDescription)")
  }
  NSLog("processStdErr finished")
}

await withTaskGroup(of: Void.self) { group in
  group.addTask {
    await processStdErr()
  }
  group.addTask {
    await processStdOut()
  }
  group.addTask {
    process.waitUntilExit()
  }
}

请注意,如果通过断开 wifi 或网络连接来强制数据进入 standardError,standardOutput 将再次解锁。

我还有什么应该尝试的吗?

Swift 多线程 macOS 异步 并发

评论

1赞 soundflix 8/17/2023
这在类似的情况下帮助了我:stackoverflow.com/questions/52335435/......

答:

2赞 Fernando Urbano 8/17/2023 #1

大多数程序默认使用默认缓冲策略,并且由于您无法控制如何处理输出,因此其中一个管道可能会阻止实现(不确定原因)。我通过调用来避免阻塞,让它同时与两个管道一起使用。/sbin/pingFileHandle.AsyncBytes.availableData

import Foundation

let outputPipe = Pipe()
let errorPipe = Pipe()

let process = Process()

process.executableURL = URL(fileURLWithPath: "/sbin/ping")
process.arguments = ["-c", "10", "diariosur.es"]
process.standardOutput = outputPipe
process.standardError = errorPipe

try? process.run()

func processStdOut() async {
    print("stdout start")
    
    while process.isRunning {
        let data = outputPipe.fileHandleForReading.availableData
        if !data.isEmpty {
            if let line = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) {
                print("stdout data: \(line)")
            }
        }
        
    }
    
    print("stdout finished")
}

func processStdErr() async {
    print("stderr start")
    
    while process.isRunning {
        let data = errorPipe.fileHandleForReading.availableData
        if !data.isEmpty {
            if let line = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) {
                print("stderr data: \(line)")
            }
        }
    }
    
    print("stderr finished")
}
    
    await withTaskGroup(of: Void.self) { group in
        group.addTask {
            await processStdErr()
        }
        
        group.addTask {
            await processStdOut()
        }
    }
    
    process.waitUntilExit()
    

然后我得到以下(修剪)输出:

stderr start
stdout start
stdout data: PING diariosur.es (23.213.41.6): 56 data bytes
64 bytes from 23.213.41.6: icmp_seq=0 ttl=57 time=7.060 ms
stdout data: 64 bytes from 23.213.41.6: icmp_seq=1 ttl=57 time=6.562 ms
...
stdout data: 64 bytes from 23.213.41.6: icmp_seq=9 ttl=57 time=7.904 ms
stdout data: --- diariosur.es ping statistics ---
10 packets transmitted, 10 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 6.562/7.327/9.439/0.783 ms
stdout finished
stderr finished

我尝试过默认为,它也没有阻止:curlstderr

process.executableURL = URL(fileURLWithPath: "/usr/bin/curl")
process.arguments = ["-N", "--output", "test.zsync", "http://ubuntu.mirror.digitalpacific.com.au/releases/23.04/ubuntu-23.04-desktop-amd64.iso.zsync"]

编辑:

使用以下 C 程序进行测试:

#include <stdio.h>
#include <unistd.h>

int main() {
    for (int i = 1; i <= 100; ++i) {
        fprintf(stdout, "stdout: %d\n", i); 
        fflush(stdout); 
    
        if (i % 10 == 0) {
                fprintf(stderr, "stderr: %d\n", i); 
                fflush(stderr);
        }   
        usleep(100000);
    }   
    
    return 0;
}

它返回以下输出:

stdout start
stderr start
stdout data: stdout: 1
stdout data: stdout: 2
stdout data: stdout: 3
stdout data: stdout: 4
stdout data: stdout: 5
stdout data: stdout: 6
stdout data: stdout: 7
stdout data: stdout: 8
stdout data: stdout: 9
stderr data: stderr: 10
stdout data: stdout: 10
stdout data: stdout: 11
stdout data: stdout: 12
stdout data: stdout: 13
stdout data: stdout: 14
stdout data: stdout: 15
stdout data: stdout: 16
stdout data: stdout: 17
stdout data: stdout: 18
stdout data: stdout: 19
stderr data: stderr: 20

评论

1赞 karl 8/17/2023
通过删除和读取来工作。为了安全起见,必须实现自己的行解析器。FileHandle.AsyncByteslines
0赞 Rob 8/18/2023
FWIW,我的经验表明该问题与缓冲区的冲洗无关。只处理,我得到它,因为它是输出的。do and 并且您会得到这种阻塞行为。但是使用你的方法或,问题就消失了。bytesstandardOutputstandardOutputstandardErrorbytesreadabilityHandler
4赞 Rob 8/21/2023 #2

是的,在同时使用 on 时,标准实现似乎也会阻塞。bytesstandardOutputbytesstandardError

这是一个不会阻塞的简单实现,因为它利用了:bytesreadabilityHandler

extension Pipe {
    struct AsyncBytes: AsyncSequence {
        typealias Element = UInt8

        let pipe: Pipe

        func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
            AsyncStream { continuation in
                pipe.fileHandleForReading.readabilityHandler = { @Sendable handle in
                    let data = handle.availableData

                    guard !data.isEmpty else {
                        continuation.finish()
                        return
                    }

                    for byte in data {
                        continuation.yield(byte)
                    }
                }

                continuation.onTermination = { _ in
                    pipe.fileHandleForReading.readabilityHandler = nil
                }
            }.makeAsyncIterator()
        }
    }

    var bytes: AsyncBytes { AsyncBytes(pipe: self) }
}

因此,在同时处理两者时,以下人员不会遇到相同的问题:standardOutputstandardError

let outputPipe = Pipe()
let errorPipe = Pipe()

let process = Process()
process.executableURL = URL(fileURLWithPath: …)
process.standardOutput = outputPipe
process.standardError = errorPipe

func processStandardOutput() async throws {
    for try await line in outputPipe.bytes.lines {
        …
    }
}

func processStandardError() async throws {
    for try await line in errorPipe.bytes.lines {
        …
    }
}

// optionally, you might want to return whatever non-zero termination status code the process returned

process.terminationHandler = { process in
    if process.terminationStatus != 0 {
        exit(process.terminationStatus)
    }
}

try process.run()

try? await withThrowingTaskGroup(of: Void.self) { group in
    group.addTask {
        try await processStandardOutput()
    }
    
    group.addTask {
        try await processStandardError()
    }
    
    …
}