如何在netty中跳过某些指定的Handler或reactor-netty的正确处理程序用法?

How to skip some specified Handler in netty or the correct handler usage with reactor-netty?

提问人:Peng 提问时间:11/16/2023 更新时间:11/16/2023 访问量:12

问:

总结

我是netty的新手,并计划演示一个rtsp在线桌面共享应用程序进行练习。现在我感到困惑的是,当我通过 RTSP 请求桌面共享框架时,它应该由 Netty Native 提供解码,但其他数据也会被解码吗?我的目的是,如果数据是RTSP的,那么它被解码了,如果不是,它应该触发后续的处理程序,但现在反应式没有重新获得任何数据。有人可以告诉我,这个的正确用法吗?我把它弄得一团糟.RtspDecoderrtspHandlerReactiveConnectionConsumer

RTPS 客户端 I 通过 Netty 将数据包装器发送到 RTSP 中ByteBuf


public void configSendConsumer (Consumer<DefaultFullHttpRequest> consumer) {
        new Thread(()-> {
            while (true){
//                recorder.get
                if (bos.size() > 0){
                    ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer();

                    buffer.writeBytes(bos.toByteArray());

                    DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(RtspVersions.RTSP_1_0,
                            RtspMethods.OPTIONS, "/live", buffer);
                    consumer.accept(defaultFullHttpRequest);

                    bos.reset();
                }

            }
        }).start();
    }

服务器构建器ReactorTcpServer

  public ReactiveServer init(InetSocketAddress address){
        this.address = address;
        server = TcpServer
                .create()
                .wiretap("tcp-server", LogLevel.INFO)
                .port(address.getPort())
                .doOnConnection(connection -> {
                    log.debug("connection has been established ");

                    connection.addHandlerLast(new ProtobufEncoder());

                    ProtoBufMessageLiteScanner.protobufDecoders()
                                    .forEach(connection::addHandlerLast);
                    connection
                            .addHandlerLast(new RtspEncoder())   // i add this one to  decode the rtsp protocol , 
                            .addHandlerLast(new RtspDecoder())
                            ;
                })
                .handle(ReactiveHandlerSPI.wiredSpiHandler().handler())
        ;

        log.info("startup netty  on port {}",address.getPort());

        return this;
    }

使用反应器处理程序的消费者数据


@Slf4j
public class ReactiveConnectionConsumer extends ConnectionConsumer {

    public ReactiveConnectionConsumer(){
        super((nettyInbound, nettyOutbound) -> {

            Flux<byte[]> handle = nettyInbound.receive().handle((byteBuf, sink) ->

                nettyInbound.withConnection(connection -> {

                log.debug("receive data ");

                int i = byteBuf.readableBytes();

                if (i > 0) {

                    try{

                        ByteBufProcessService.getInstance().process(connection,byteBuf);

                    }catch (Exception exception){

                        log.error("reactor netty occur error {} ", ExceptionUtil.stacktraceToString(exception));

                        sink.next(("occur error {} " + ExceptionUtil.stacktraceToString(exception)).getBytes());

                    }

                }

                sink.next("server has receive your data ".getBytes());
                }));
            var nettyOutbound1 = nettyOutbound.sendByteArray(Flux.concat(handle));
            return nettyOutbound1.then();

        });

    }

    @Override
    public void accept(Connection c) {

        super.accept(c);

    }
}

这里是项目链接 github/pengpengon/Meeting ,可能是整个设计方式错误,有人可以帮助演示 netty 最佳实践中的一般 rtsp 吗?

Netty 反应釜-netty

评论


答: 暂无答案