提问人:Raedwald 提问时间:2/28/2012 最后编辑:CommunityRaedwald 更新时间:4/28/2019 访问量:14628
PipedInputStream/PipedOutputStream 的缺陷
Flaws with PipedInputStream/PipedOutputStream
问:
我在 SO 上看到了两个答案,它们声称 Java 提供的 and 类是有缺陷的。但他们没有详细说明他们出了什么问题。它们真的有缺陷吗,如果有,以什么方式存在?我目前正在编写一些使用它们的代码,所以我想知道我是否走错了路。PipedInputStream
PipedOutputStream
一个答案说:
PipedInputStream
并且被破坏(关于线程)。他们假设每个实例都绑定到一个特定的线程。这很奇怪。PipedOutputStream
对我来说,这似乎既不奇怪也不破碎。也许作者还有其他一些缺陷?
另一个答案说:
在实践中,最好避免它们。我在 13 年里用过一次,我希望我没有。
但那位作者想不起问题出在哪里。
与所有类一样,尤其是多线程中使用的类,如果误用它们,将会出现问题。因此,我不认为不可预测的“写终结”IOException
可能是一个缺陷(连接失败是一个错误;请参阅文章Whats this?IOException: Write end dead,作者:Daniel Ferbers,了解更多信息)。还有哪些其他声称的缺陷?PipedInputStream
close()
PipedOutputStream
答:
他们没有缺陷。
与所有类一样,尤其是多线程中使用的类,如果误用它们,将会出现问题。不可预测的“写结束死”可能抛出不是一个缺陷(连接失败是一个错误;请参阅文章 这是什么?IOException: Write end dead,作者:Daniel Ferbers,了解更多信息)。IOException
PipedInputStream
close()
PipedOutputStream
评论
我在我的项目中很好地使用了它们,它们对于即时修改流和传递它们非常宝贵。唯一的缺点似乎是 PipedInputStream 的缓冲区很短(大约 1024 个),而我的输出流大约泵入了 8KB。
它没有缺陷,而且效果很好。
-------- 时髦的例子
public class Runner{
final PipedOutputStream source = new PipedOutputStream();
PipedInputStream sink = new PipedInputStream();
public static void main(String[] args) {
new Runner().doit()
println "Finished main thread"
}
public void doit() {
sink.connect(source)
(new Producer(source)).start()
BufferedInputStream buffer = new BufferedInputStream(sink)
(new Consumer(buffer)).start()
}
}
class Producer extends Thread {
OutputStream source
Producer(OutputStream source) {
this.source=source
}
@Override
public void run() {
byte[] data = new byte[1024];
println "Running the Producer..."
FileInputStream fout = new FileInputStream("/Users/ganesh/temp/www/README")
int amount=0
while((amount=fout.read(data))>0)
{
String s = new String(data, 0, amount);
source.write(s.getBytes())
synchronized (this) {
wait(5);
}
}
source.close()
}
}
class Consumer extends Thread{
InputStream ins
Consumer(InputStream ins)
{
this.ins = ins
}
public void run()
{
println "Consumer running"
int amount;
byte[] data = new byte[1024];
while ((amount = ins.read(data)) >= 0) {
String s = new String(data, 0, amount);
println "< $s"
synchronized (this) {
wait(5);
}
}
}
}
评论
PipedInputStream
从我的角度来看,有一个缺陷。更准确地说,如果应该将数据泵入 PipedOutputStream 的 Thread 在实际将单个字节写入流之前过早死亡,则存在很高的死锁风险。在这种情况下,问题在于管道流的实施无法检测到破损的管道。因此,从 PipedInputStream 读取的线程将在它对 read() 的第一次调用中永远等待(即死锁)。
破损管道检测实际上依赖于对 write() 的第一次调用,因为实现将延迟初始化写入侧线程,并且只有从该时间点开始,破损管道检测才会起作用。
以下代码重现了这种情况:
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import org.junit.Test;
public class PipeTest
{
@Test
public void test() throws IOException
{
final PipedOutputStream pout = new PipedOutputStream();
PipedInputStream pin = new PipedInputStream();
pout.connect(pin);
Thread t = new Thread(new Runnable()
{
public void run()
{
try
{
if(true)
{
throw new IOException("asd");
}
pout.write(0); // first byte which never get's written
pout.close();
}
catch(IOException e)
{
throw new RuntimeException(e);
}
}
});
t.start();
pin.read(); // wait's forever, e.g. deadlocks
}
}
评论
try (OutputStream pout_closed = pout) { ... }
available()
read()
read()
我在 JDK 实现中看到的缺陷是:
1)没有超时,读写器可以无限阻塞。
2) 对数据传输时间的控制欠佳(应仅在刷新或循环缓冲区已满时进行)
因此,我创建了自己的方法来解决上述问题(通过 ThreadLocal 传递的超时值):
如何使用:
希望对您有所帮助...
一个缺陷可能是作者没有明确的方式向读者表明它遇到了问题:
PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out);
new Thread(() -> {
try {
writeToOut(out);
out.close();
}
catch (SomeDataProviderException e) {
// Have to notify the reading side, but how?
}
}).start();
readFromIn(in);
作者可以关闭,但读者可能会将其误解为数据的结尾。为了正确处理这个问题,需要额外的逻辑。如果提供手动破坏管道的功能会更容易。out
现在有 JDK-8222924,它请求一种手动破坏管道的方法。
评论