在“java.nio.channels.AsynchronousSocketChannel”中对异步“read-message-then-dispatch”循环进行建模,“CompleteableFuture<>”是否是正确的工具?

Modeling asynchronous "read-message-then-dispatch" loop in `java.nio.channels.AsynchronousSocketChannel`, is `CompleteableFuture<>` the proper tool?

提问人:Gavin Ray 提问时间:1/10/2023 更新时间:1/10/2023 访问量:22

问:

我在 JVM 上编写一个 Postgres 有线协议服务器,服务器的核心循环是从 PSQL 客户端通过套接字接收命令,处理它,然后通过网络读取/等待另一个命令。

我做了一个方法,它是从套接字读取的入口点。发生一些解码和业务逻辑,然后它调度到 ,我让它返回 a,这样当它完成时,我可以调用它让它再次循环回 。read()handleClientMessage()CompleteableFuture<>.thenRun(() -> read())read()

我之前的替代方案是记住在每个方法的末尾调用。read()

我对使用的犹豫是我相信它会将任务排队以在池中运行,这听起来可能有很多开销?这是一个糟糕的做法,只是为了让我可以调用结果吗?CompleteableFuture<>.thenRun(() -> {})

代码如下,删除了大部分不相关的位:


// A single client connection to the Postgres wire-protocol server
class PostgresServerClientConnection {
    private AsynchronousSocketChannel client;
    private MemorySegment buffer;

    public PostgresServerClientConnection(AsynchronousSocketChannel client) {
        this.client = client;
        this.buffer = MemorySegment.allocateNative(1024, SegmentScope.global());
    }

    public void read() {
        ByteBuffer byteBuffer = buffer.asByteBuffer();
        client.read(byteBuffer, null, new CompletionHandler<>() {
            @Override
            public void completed(Integer result, Object attachment) {
                // Create an infinite loop of reading and handling messages
                handleClientMessage(msg).thenRun(() -> read());
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.err.println("[SERVER] Failed to read from client: " + exc);
            }
        });
    }

    CompletableFuture<Void> handleClientMessage(ClientMessage msg) {
        if (msg.isSSLRequest()) {
            return handleSSLRequest(msg);
        } else if (msg.isStartupMessage()) {
            return handleStartupMessage(msg);
        } else {
            return handleRegularMessage(msg);
        }
    }

    CompletableFuture<Void> handleSSLRequest(ClientMessage msg) {
        return CompletableFuture.runAsync(() -> {
            try {
                client.write(byteBuffer).get();
            } catch (InterruptedException | ExecutionException e) {
            }
        });
    }

    CompletableFuture<Void> handleStartupMessage(ClientMessage msg) {
        return CompletableFuture.runAsync(() -> {
            try {
                client.write(byteBuffer).get();
            } catch (InterruptedException | ExecutionException e) {
            }
        });
    }
}
Java 套接字 异步 async-await

评论


答: 暂无答案