为什么在 Scala 中使用管道 IO 流会无限期挂起?[复制]

Why is this use of piped IO streams in Scala hanging indefinitely? [duplicate]

提问人:alcorn 提问时间:4/1/2023 更新时间:4/1/2023 访问量:54

问:

我正在尝试这篇博文中讨论的流接口。我已经制作了一个感觉像是简单的实现来测试接口,但是当我运行它时,它会无限期地挂起,我不明白为什么。代码如下:

import java.io.PipedOutputStream
import java.io.PipedInputStream
import java.io.InputStream
import java.io.OutputStream
import scala.concurrent.{Future, Promise, Await}
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
import scala.concurrent.duration._
import scala.collection.immutable.ListMap

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

def copy(input: InputStream, output: OutputStream): Unit = {
  val buffer = new Array[Byte](1024)
  var bytesRead = input.read(buffer)
  while (bytesRead != -1) {
    output.write(buffer, 0, bytesRead)
    bytesRead = input.read(buffer)
  }
}

def curthread(label: String): Unit = println(label, Thread.currentThread().getName())

trait Writable{
  def writeBytesTo(out: OutputStream): Unit
}

trait Readable extends Writable{
  def readBytesThrough[T](f: InputStream => T): T
  def writeBytesTo(out: OutputStream): Unit = readBytesThrough(copy(_, out))
}

def convertToUpperCase(input: InputStream, output: OutputStream): Unit = {
  println("convertToUpperCase")
  curthread("convertToUpperCase")
  val buffer = new Array[Byte](1024) // buffer size
  println("convertToUpperCase: input.read")
  var bytesRead = input.read(buffer)

  while (bytesRead != -1) {
    println(s"convertToUpperCase read $bytesRead")
    val upperBuffer = buffer.map(_.toChar.toUpper.toByte)
    println("convertToUpperCase: output.write")
    output.write(upperBuffer, 0, bytesRead)
    println("convertToUpperCase: input.read")
    bytesRead = input.read(buffer)
  }

  input.close()
  output.close()
}

class Put extends Readable {
  private val output = new PipedOutputStream

  def readBytesThrough[T](f: InputStream => T): T = {
    println("Put: readBytesThrough")
    curthread("Put")
    f(new PipedInputStream(output))
  }
  
  def receive(x: Any): Unit = {
    println(s"Put: receive($x)")
    curthread("Put")
    output.write(x.toString.getBytes("utf-8"))
    println("done receiving")
  }

  def receivedLast(): Unit = {
    println("Put: receivedLast")
    curthread("Put")
    output.close()
  }
}

case class Parsed(value: String) extends Writable {
  def writeBytesTo(out: OutputStream): Unit = {
    println("Parsed: writeBytesTo")
    curthread("Parsed")
    out.write(value.getBytes("utf-8"))
  }
}

def uppercase(r: Readable): Readable = new Readable {
  def readBytesThrough[T](f: InputStream => T): T = {
    val output = new PipedOutputStream
    Future {
      r.readBytesThrough(input => {
        convertToUpperCase(input, output)
      })
    }
    f(new PipedInputStream(output))
  }
}

def parse(r: Readable): Writable = {
  curthread("parse")
  val x = r.readBytesThrough(scala.io.Source.fromInputStream(_).mkString)
  Parsed(x)
}

def display(w: Writable): Unit = {
  curthread("display")
  w.writeBytesTo(System.out)
}

val put = new Put

curthread("main")

Future {
    put.receive("foobarbaz")
    put.receivedLast()
}

display(parse(uppercase(put)))

这给出了输出

(main,run-main-0)
Put: receive(foobarbaz)
(Put,pool-11-thread-1)
(parse,run-main-0)
Put: readBytesThrough
(Put,pool-11-thread-2)
convertToUpperCase
(convertToUpperCase,pool-11-thread-2)
convertToUpperCase: input.read

因此,对管道流两端的读取和写入似乎应该在不同的线程上进行。那为什么它挂在?convertToUpperCase

Scastie 链接: https://scastie.scala-lang.org/I9EIOs8NR2mBeLbcqDx2zA

Java 多线程 Scala IO 管道

评论

1赞 Luis Miguel Mejía Suárez 4/1/2023
如果您真的需要/想要一些有效的方法来流式传输 I/O 数据,我建议您查看 fs2ZIO,也许还有 AkkaStreams。但是,请注意,它们中的每一个都有一个学习曲线,并暗示了一些新概念,例如数据类型或参与者。IO
0赞 alcorn 4/2/2023
@GaëlJ看起来该问题中的问题是输入流错误地发送了 EOF。在这种情况下,读取完全挂起,什么都没有出来,所以我认为它是不同的。
0赞 alcorn 4/2/2023
投票结束这个问题,支持我提出的一个新问题,该问题可以更有效地隔离问题:stackoverflow.com/questions/75908870/......

答: 暂无答案