提问人:Peng 提问时间:11/16/2023 更新时间:11/16/2023 访问量:12
如何在netty中跳过某些指定的Handler或reactor-netty的正确处理程序用法?
How to skip some specified Handler in netty or the correct handler usage with reactor-netty?
问:
总结
我是netty的新手,并计划演示一个rtsp在线桌面共享应用程序进行练习。现在我感到困惑的是,当我通过 RTSP 请求桌面共享框架时,它应该由 Netty Native 提供解码,但其他数据也会被解码吗?我的目的是,如果数据是RTSP的,那么它被解码了,如果不是,它应该触发后续的处理程序,但现在反应式没有重新获得任何数据。有人可以告诉我,这个的正确用法吗?我把它弄得一团糟.RtspDecoder
rtspHandler
ReactiveConnectionConsumer
RTPS 客户端 I 通过 Netty 将数据包装器发送到 RTSP 中ByteBuf
public void configSendConsumer (Consumer<DefaultFullHttpRequest> consumer) {
new Thread(()-> {
while (true){
// recorder.get
if (bos.size() > 0){
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(bos.toByteArray());
DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(RtspVersions.RTSP_1_0,
RtspMethods.OPTIONS, "/live", buffer);
consumer.accept(defaultFullHttpRequest);
bos.reset();
}
}
}).start();
}
服务器构建器ReactorTcpServer
public ReactiveServer init(InetSocketAddress address){
this.address = address;
server = TcpServer
.create()
.wiretap("tcp-server", LogLevel.INFO)
.port(address.getPort())
.doOnConnection(connection -> {
log.debug("connection has been established ");
connection.addHandlerLast(new ProtobufEncoder());
ProtoBufMessageLiteScanner.protobufDecoders()
.forEach(connection::addHandlerLast);
connection
.addHandlerLast(new RtspEncoder()) // i add this one to decode the rtsp protocol ,
.addHandlerLast(new RtspDecoder())
;
})
.handle(ReactiveHandlerSPI.wiredSpiHandler().handler())
;
log.info("startup netty on port {}",address.getPort());
return this;
}
使用反应器处理程序的消费者数据
@Slf4j
public class ReactiveConnectionConsumer extends ConnectionConsumer {
public ReactiveConnectionConsumer(){
super((nettyInbound, nettyOutbound) -> {
Flux<byte[]> handle = nettyInbound.receive().handle((byteBuf, sink) ->
nettyInbound.withConnection(connection -> {
log.debug("receive data ");
int i = byteBuf.readableBytes();
if (i > 0) {
try{
ByteBufProcessService.getInstance().process(connection,byteBuf);
}catch (Exception exception){
log.error("reactor netty occur error {} ", ExceptionUtil.stacktraceToString(exception));
sink.next(("occur error {} " + ExceptionUtil.stacktraceToString(exception)).getBytes());
}
}
sink.next("server has receive your data ".getBytes());
}));
var nettyOutbound1 = nettyOutbound.sendByteArray(Flux.concat(handle));
return nettyOutbound1.then();
});
}
@Override
public void accept(Connection c) {
super.accept(c);
}
}
这里是项目链接 github/pengpengon/Meeting ,可能是整个设计方式错误,有人可以帮助演示 netty 最佳实践中的一般 rtsp 吗?
答: 暂无答案
评论