PipedInputStream/PipedOutputStream 的缺陷

Flaws with PipedInputStream/PipedOutputStream

提问人:Raedwald 提问时间:2/28/2012 最后编辑:CommunityRaedwald 更新时间:4/28/2019 访问量:14628

问:

我在 SO 上看到了两个答案,它们声称 Java 提供的 and 类是有缺陷的。但他们没有详细说明他们出了什么问题。它们真的有缺陷吗,如果有,以什么方式存在?我目前正在编写一些使用它们的代码,所以我想知道我是否走错了路。PipedInputStreamPipedOutputStream

一个答案说:

PipedInputStream并且被破坏(关于线程)。他们假设每个实例都绑定到一个特定的线程。这很奇怪。PipedOutputStream

对我来说,这似乎既不奇怪也不破碎。也许作者还有其他一些缺陷?

另一个答案说:

在实践中,最好避免它们。我在 13 年里用过一次,我希望我没有。

但那位作者想不起问题出在哪里。


与所有类一样,尤其是多线程中使用的类,如果误用它们,将会出现问题。因此,我不认为不可预测的“写终结”IOException可能是一个缺陷(连接失败是一个错误;请参阅文章Whats this?IOException: Write end dead,作者:Daniel Ferbers,了解更多信息)。还有哪些其他声称的缺陷?PipedInputStreamclose()PipedOutputStream

Java 多线程 IO

评论

0赞 TC1 2/28/2012
这个 stackoverflow.com/questions/484119/......有点覆盖了它。它们并不是真正的“有缺陷”,只是有点棘手,而且通常还有标志代码的味道,如果你 100% 确定你需要它们并且设计中没有错误,那么使用它们就没有真正的问题......
1赞 peterk 3/27/2013
快速浏览一下,因为我想使用一个。它至少是“精选”,因为读取线程不会真正等待写入线程写入完整的读取请求,如果写入器关闭它等,则中止并出现 EOF 异常。它具有非常原始的线程处理和同步,并且要求缓冲区与最大的读取请求一样大。

答:

9赞 Raedwald 3/11/2012 #1

他们没有缺陷。

与所有类一样,尤其是多线程中使用的类,如果误用它们,将会出现问题。不可预测的“写结束死”可能抛出不是一个缺陷(连接失败是一个错误;请参阅文章 这是什么?IOException: Write end dead,作者:Daniel Ferbers,了解更多信息)。IOExceptionPipedInputStreamclose()PipedOutputStream

评论

0赞 fabian 2/20/2014
我想请你对我最近遇到的一个问题发表意见。你能看一下吗?stackoverflow.com/questions/21884188/......
3赞 Ganesh Krishnan 4/17/2012 #2

我在我的项目中很好地使用了它们,它们对于即时修改流和传递它们非常宝贵。唯一的缺点似乎是 PipedInputStream 的缓冲区很短(大约 1024 个),而我的输出流大约泵入了 8KB。

它没有缺陷,而且效果很好。

-------- 时髦的例子

public class Runner{


final PipedOutputStream source = new PipedOutputStream();
PipedInputStream sink = new PipedInputStream();

public static void main(String[] args) {
    new Runner().doit()
    println "Finished main thread"
}


public void doit() {

    sink.connect(source)

    (new Producer(source)).start()
    BufferedInputStream buffer = new BufferedInputStream(sink)
    (new Consumer(buffer)).start()
}
}

class Producer extends Thread {


OutputStream source
Producer(OutputStream source) {
    this.source=source
}

@Override
public void run() {

    byte[] data = new byte[1024];

    println "Running the Producer..."
    FileInputStream fout = new FileInputStream("/Users/ganesh/temp/www/README")

    int amount=0
    while((amount=fout.read(data))>0)
    {
        String s = new String(data, 0, amount);
        source.write(s.getBytes())
        synchronized (this) {
            wait(5);
        }
    }

    source.close()
}

}

class Consumer extends Thread{

InputStream ins

Consumer(InputStream ins)
{
    this.ins = ins
}

public void run()
{
    println "Consumer running"

    int amount;
    byte[] data = new byte[1024];
    while ((amount = ins.read(data)) >= 0) {
        String s = new String(data, 0, amount);
        println "< $s"
        synchronized (this) {
            wait(5);
        }
    }

}

}

评论

0赞 zeugor 3/13/2017
sink.close() 被遗漏了?
0赞 Marcono1234 4/20/2019
从 Java 6 开始,您可以指定自定义管道大小PipedInputStream​
0赞 Christian K. 2/27/2014 #3

从我的角度来看,有一个缺陷。更准确地说,如果应该将数据泵入 PipedOutputStream 的 Thread 在实际将单个字节写入流之前过早死亡,则存在很高的死锁风险。在这种情况下,问题在于管道流的实施无法检测到破损的管道。因此,从 PipedInputStream 读取的线程将在它对 read() 的第一次调用中永远等待(即死锁)。

破损管道检测实际上依赖于对 write() 的第一次调用,因为实现将延迟初始化写入侧线程,并且只有从该时间点开始,破损管道检测才会起作用。

以下代码重现了这种情况:

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import org.junit.Test;

public class PipeTest
{
    @Test
    public void test() throws IOException
    {
        final PipedOutputStream pout = new PipedOutputStream();
        PipedInputStream pin = new PipedInputStream();

        pout.connect(pin);

        Thread t = new Thread(new Runnable()
        {
            public void run()
            {
                try
                {
                    if(true)
                    {
                        throw new IOException("asd");
                    }
                    pout.write(0); // first byte which never get's written
                    pout.close();
                }
                catch(IOException e)
                {
                    throw new RuntimeException(e);
                }
            }
        });
        t.start();

        pin.read(); // wait's forever, e.g. deadlocks
    }
}

评论

0赞 Robert Casey 7/23/2015
您希望写入和读取操作位于独立的线程中。为了让使用者不会因为争用条件而立即退出,最好在执行读取操作之前执行 pin.available() 检查以查看它是否大于 0。然后你可以重复执行可用和读取操作,直到再次可用 == 0。您还希望对读取端进行异常处理,以防编写器断开连接。
0赞 Alex Q 7/13/2016
出现异常时,此测试代码无法关闭输出流。如果用代码将 try 块的内容包围起来,则不会死锁。try (OutputStream pout_closed = pout) { ... }
0赞 Marcono1234 4/20/2019
@RobertCasey,您仍然必须等到返回 > 0,这与直接调用 相同。此外,当读取线程在调用中已被阻止时,您实际上无法进行太多异常处理。available()read()read()
0赞 Marcono1234 4/20/2019
@AlexQ关闭管道将向读者指示已到达数据的末尾。它可能会错误地处理这种情况,您将需要额外的逻辑来确定是否实际达到了终点或编写器是否遇到了问题。
0赞 user2179737 6/20/2015 #4

我在 JDK 实现中看到的缺陷是:

1)没有超时,读写器可以无限阻塞。

2) 对数据传输时间的控制欠佳(应仅在刷新或循环缓冲区已满时进行)

因此,我创建了自己的方法来解决上述问题(通过 ThreadLocal 传递的超时值):

PipedOutputStream(管道输出流)

如何使用:

PiedOutputStreamTest

希望对您有所帮助...

1赞 Marcono1234 4/20/2019 #5

一个缺陷可能是作者没有明确的方式向读者表明它遇到了问题:

PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out);

new Thread(() -> {
    try {
        writeToOut(out);
        out.close();
    }
    catch (SomeDataProviderException e) {
        // Have to notify the reading side, but how?
    }
}).start();

readFromIn(in);

作者可以关闭,但读者可能会将其误解为数据的结尾。为了正确处理这个问题,需要额外的逻辑。如果提供手动破坏管道的功能会更容易。out

现在有 JDK-8222924,它请求一种手动破坏管道的方法。