提问人:alcorn 提问时间:4/1/2023 更新时间:4/1/2023 访问量:54
为什么在 Scala 中使用管道 IO 流会无限期挂起?[复制]
Why is this use of piped IO streams in Scala hanging indefinitely? [duplicate]
问:
我正在尝试这篇博文中讨论的流接口。我已经制作了一个感觉像是简单的实现来测试接口,但是当我运行它时,它会无限期地挂起,我不明白为什么。代码如下:
import java.io.PipedOutputStream
import java.io.PipedInputStream
import java.io.InputStream
import java.io.OutputStream
import scala.concurrent.{Future, Promise, Await}
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
import scala.concurrent.duration._
import scala.collection.immutable.ListMap
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
def copy(input: InputStream, output: OutputStream): Unit = {
val buffer = new Array[Byte](1024)
var bytesRead = input.read(buffer)
while (bytesRead != -1) {
output.write(buffer, 0, bytesRead)
bytesRead = input.read(buffer)
}
}
def curthread(label: String): Unit = println(label, Thread.currentThread().getName())
trait Writable{
def writeBytesTo(out: OutputStream): Unit
}
trait Readable extends Writable{
def readBytesThrough[T](f: InputStream => T): T
def writeBytesTo(out: OutputStream): Unit = readBytesThrough(copy(_, out))
}
def convertToUpperCase(input: InputStream, output: OutputStream): Unit = {
println("convertToUpperCase")
curthread("convertToUpperCase")
val buffer = new Array[Byte](1024) // buffer size
println("convertToUpperCase: input.read")
var bytesRead = input.read(buffer)
while (bytesRead != -1) {
println(s"convertToUpperCase read $bytesRead")
val upperBuffer = buffer.map(_.toChar.toUpper.toByte)
println("convertToUpperCase: output.write")
output.write(upperBuffer, 0, bytesRead)
println("convertToUpperCase: input.read")
bytesRead = input.read(buffer)
}
input.close()
output.close()
}
class Put extends Readable {
private val output = new PipedOutputStream
def readBytesThrough[T](f: InputStream => T): T = {
println("Put: readBytesThrough")
curthread("Put")
f(new PipedInputStream(output))
}
def receive(x: Any): Unit = {
println(s"Put: receive($x)")
curthread("Put")
output.write(x.toString.getBytes("utf-8"))
println("done receiving")
}
def receivedLast(): Unit = {
println("Put: receivedLast")
curthread("Put")
output.close()
}
}
case class Parsed(value: String) extends Writable {
def writeBytesTo(out: OutputStream): Unit = {
println("Parsed: writeBytesTo")
curthread("Parsed")
out.write(value.getBytes("utf-8"))
}
}
def uppercase(r: Readable): Readable = new Readable {
def readBytesThrough[T](f: InputStream => T): T = {
val output = new PipedOutputStream
Future {
r.readBytesThrough(input => {
convertToUpperCase(input, output)
})
}
f(new PipedInputStream(output))
}
}
def parse(r: Readable): Writable = {
curthread("parse")
val x = r.readBytesThrough(scala.io.Source.fromInputStream(_).mkString)
Parsed(x)
}
def display(w: Writable): Unit = {
curthread("display")
w.writeBytesTo(System.out)
}
val put = new Put
curthread("main")
Future {
put.receive("foobarbaz")
put.receivedLast()
}
display(parse(uppercase(put)))
这给出了输出
(main,run-main-0)
Put: receive(foobarbaz)
(Put,pool-11-thread-1)
(parse,run-main-0)
Put: readBytesThrough
(Put,pool-11-thread-2)
convertToUpperCase
(convertToUpperCase,pool-11-thread-2)
convertToUpperCase: input.read
因此,对管道流两端的读取和写入似乎应该在不同的线程上进行。那为什么它挂在?convertToUpperCase
Scastie 链接: https://scastie.scala-lang.org/I9EIOs8NR2mBeLbcqDx2zA
答: 暂无答案
评论
IO