提问人:Gavin Ray 提问时间:1/10/2023 更新时间:1/10/2023 访问量:22
在“java.nio.channels.AsynchronousSocketChannel”中对异步“read-message-then-dispatch”循环进行建模,“CompleteableFuture<>”是否是正确的工具?
Modeling asynchronous "read-message-then-dispatch" loop in `java.nio.channels.AsynchronousSocketChannel`, is `CompleteableFuture<>` the proper tool?
问:
我在 JVM 上编写一个 Postgres 有线协议服务器,服务器的核心循环是从 PSQL 客户端通过套接字接收命令,处理它,然后通过网络读取/等待另一个命令。
我做了一个方法,它是从套接字读取的入口点。发生一些解码和业务逻辑,然后它调度到 ,我让它返回 a,这样当它完成时,我可以调用它让它再次循环回 。read()
handleClientMessage()
CompleteableFuture<>
.thenRun(() -> read())
read()
我之前的替代方案是记住在每个方法的末尾调用。read()
我对使用的犹豫是我相信它会将任务排队以在池中运行,这听起来可能有很多开销?这是一个糟糕的做法,只是为了让我可以调用结果吗?CompleteableFuture<>
.thenRun(() -> {})
代码如下,删除了大部分不相关的位:
// A single client connection to the Postgres wire-protocol server
class PostgresServerClientConnection {
private AsynchronousSocketChannel client;
private MemorySegment buffer;
public PostgresServerClientConnection(AsynchronousSocketChannel client) {
this.client = client;
this.buffer = MemorySegment.allocateNative(1024, SegmentScope.global());
}
public void read() {
ByteBuffer byteBuffer = buffer.asByteBuffer();
client.read(byteBuffer, null, new CompletionHandler<>() {
@Override
public void completed(Integer result, Object attachment) {
// Create an infinite loop of reading and handling messages
handleClientMessage(msg).thenRun(() -> read());
}
@Override
public void failed(Throwable exc, Object attachment) {
System.err.println("[SERVER] Failed to read from client: " + exc);
}
});
}
CompletableFuture<Void> handleClientMessage(ClientMessage msg) {
if (msg.isSSLRequest()) {
return handleSSLRequest(msg);
} else if (msg.isStartupMessage()) {
return handleStartupMessage(msg);
} else {
return handleRegularMessage(msg);
}
}
CompletableFuture<Void> handleSSLRequest(ClientMessage msg) {
return CompletableFuture.runAsync(() -> {
try {
client.write(byteBuffer).get();
} catch (InterruptedException | ExecutionException e) {
}
});
}
CompletableFuture<Void> handleStartupMessage(ClientMessage msg) {
return CompletableFuture.runAsync(() -> {
try {
client.write(byteBuffer).get();
} catch (InterruptedException | ExecutionException e) {
}
});
}
}
答: 暂无答案
评论