使用 AsynchronousSocketChannel 时出现 ReadPendingExceptionException

ReadPendingException when using AsynchronousSocketChannel

提问人:in_a_pumpkin_patch 提问时间:5/14/2016 更新时间:5/19/2016 访问量:1771

问:

我一直在使用 AsychronousSockets,但在运行大型负载时遇到了收到 ReadPendingException 的问题。

一些背景:我希望客户端以异步方式向服务器发送消息,然后侦听响应。

响应可以是 3 种类型之一,并且对 AsynchronousSocketChannel 的读取需要预先确定大小的 ByteBuffer。

因此,我的解决方案是进行两次读取:一次是首先接收消息类型(作为 4 个字节传入,一个 int),然后是另一个读取,它将构造适当大小的字节缓冲区来处理消息的其余部分。

我认为这里的主要问题在于,当调用 CompletetionHandler 的完整方法时,并不一定意味着 ByteBuffer 的读取完成。为了解决这个问题,我创建了一个 while 循环,该循环将一直读取到 ByteBuffer 已满。

但是,while 循环中的这种读取似乎偶尔会与其他读取冲突,这时我收到 ReadPendingException。

基本骨架代码:

AsynchronousChannelGroup mThreadGroup= AsynchronousChannelGroup.withFixedThreadPool(100, Executors.defaultThreadFactory());
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(mThreadGroup);
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 12345);

// Connect to server
channel.connect(hostAddress, null, new CompletionHandler<Void, Void>() {
    @Override
    public void completed(Void result, Void attachment) {
        // Write some message to server
        ByteBuffer message = ...
        channel.write(message, null, new CompletionHandler<Void, Void>() {

            @Override
            public void completed(Void result, Void attachment) {
                // Now that we have sent the message, listen for a response type
                ByteBuffer responseType = ...
                channel.read(responseType, null, new CompletionHandler<Void, Void>() {

                    @Override
                    public void completed(Void result, Void attachment) {
                        // parse response type, do some other stuff...
                        // ...
                        // After other stuff, create new byte buffer for main message
                        ByteBuffer receiveMessage = ...
                        channel.read(receiveMessage, null, new CompletionHandler<Void, Void>() {

                            @Override
                            public void completed(Void result, Void attachment) {
                                // The read may not have been completed, so make sure that it is
                                while (receiveMessage.remaining() > 0) {
                                    channel.read(receiveMessage);
                                }

                                // Handle receiveMessage...
                            }
                            @Override
                            public void failed(Throwable exc, Void attachment) {}
                        });
                    }
                    @Override
                    public void failed(Throwable exc, Void attachment) {}
                });
            }

            public void failed(Throwable exc, Void attachment) {}
        });
    }

    @Override
    public void failed(Throwable exc, Void attachment) {}
});

因此,我的问题有两个方面:

  1. 有没有办法让代码(如上所示)正常工作(又名不再接收 ReadPendingException)?

  2. 有没有更好的方法来设置相同的功能?

提前感谢你们提供的任何可能的帮助。

Java 多线程 套接字异

评论


答:

-1赞 aditrip 5/17/2016 #1

我认为这个异常即将到来,因为另一个线程正在尝试调用这个completionHandler。需要在 readCompletionHandler 上同步访问。或者,尝试删除 ReadCompletionHandler 中的 while(remaining) 循环。使用 bytesRead 参数调用完成的回调函数。使用此值创建用于读取消息的缓冲区。例如:

' class ClientReadCompletionHandler 实现 CompletionHandler {

private AsynchronousSocketChannel socket;
private ByteBuffer readBuffer;

public ClientReadCompletionHandler(AsynchronousSocketChannel socket, ByteBuffer readBuffer) {
    this.socket = socket;
    this.readBuffer = readBuffer;
    System.out.println("readBuffer in constructor: " + readBuffer);
}

@Override
public void completed(Integer bytesRead, IOContext state) {
    if (bytesRead != null && bytesRead == -1) {
        try {
            socket.close();
            return;
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    byte[] buffer = new byte[bytesRead];
    readBuffer.rewind();
    // Rewind the input buffer to read from the beginning

    readBuffer.get(buffer);

    if(readBuffer.hasRemaining()) {
        // process this readBuffer somehow to read all remaining data.
        // This will not be the place to call socket.read() again.

    }      
        readBuffer.clear();
        IOContext readState = new IOContext();  
   // Now read from socket again.
        socket.read(readBuffer, readState, this);
}

'

评论

0赞 user207421 8/23/2023
另一个线程,比如什么?
1赞 user207421 5/19/2016 #2

不应在读取完成方法中循环,尤其是在不首先检查是否拥有所有数据的情况下。您应该首先检查是否已收到所需的所有数据,如果尚未收到,请使用相同的完成方法再发出一次读取。该过程将递归,直到它满足第一个测试。