Boost ASIO async_read_until无法读取背靠背消息的问题

Issue with Boost ASIO async_read_until not reading back to back messages

提问人:bustus_primus 提问时间:11/14/2023 最后编辑:bustus_primus 更新时间:11/14/2023 访问量:39

问:

我正在使用 boost.asio 实现一个网络库。这是 TCP 类的读取函数。

我的一个测试用例有一个客户端连接到服务器,一旦握手完成,服务器会立即与分隔符背靠背发送两条消息。客户端读取第一条消息,但不读取第二条消息。如果稍后发送第三条消息,它将拾取第二条错过的消息。bytes 变量仅是一条消息的长度。这里有什么明显的问题吗?

(我删除了不相关的代码)

boost::asio::streambuf _recBuf;
ssl::stream<ip::tcp::socket&> _stream;

void SomeClass::read(const String &delim) {
    boost::asio::async_read_until(_stream, _recBuf, delim,
    boost::asio::bind_executor(_readStrand,
    [this, delim, self = shared_from_this()] (const boost::system::error_code& ec, const std::size_t bytes) {
        if (!ec) {

            if (bytes <= delim.size()) {
                return;
            }

            String data(boost::asio::buffers_begin(_recBuf.data()), boost::asio::buffers_begin(_recBuf.data()) + (bytes - delim.size()));
            _recBuf.consume(bytes);

            read(delim);
        }
        else if (ec == system::errc::operation_canceled) {
            
        }
        else {
            
        }
    }));
}

我试图使用 commit() 方法和 istream 改变我阅读_recBuf的方式。

在 _stream.lowest_layer() 上调用 available() 方法以查看客户端上有多少字节可用,等待两条背靠背消息,在读取初始消息后返回 0。

编辑

这是 write 方法。也许这可能会导致问题?

void TCPSBase::write(const Message &msg) {
String data = msg.toJsonStr() + '^';
asio::async_write(_stream, asio::buffer(data),
asio::bind_executor(_writeStrand,
[this, data, self = shared_from_this()] (const boost::system::error_code& ec, const std::size_t bytes) {
    LogHandler lh(Logger::getInstance(), "TCPSBase::write::lambda");
    if (!ec) {
        lh.d("wrote " + std::to_string(bytes) + " bytes");
        lh.d(data);
    }
    else if (ec == system::errc::operation_canceled) {
        lh.d("canceled");
    }
    else {
        lh.e("disconnecting after error during write: " + ec.message());
        stop();
    }
}));

}

C++ 套接字 boost-asio

评论


答:

0赞 sehe 11/14/2023 #1

在 _stream.lowest_layer() 上调用 available() 方法以查看可用的字节数会导致错误的文件描述符异常。

这意味着套接字变得无效,例如因为它已关闭。

为了回顾问题的其余部分,我使用最少的代码使您的代码自包含:

在 Coliru 上直播

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <iostream>
namespace asio = boost::asio;
namespace ssl = asio::ssl;
using namespace std::chrono_literals;
using error_code = boost::system::error_code;
using asio::ip::tcp;

static ssl::context s_ctx{ssl::context::sslv23_server};

struct SomeClass : std::enable_shared_from_this<SomeClass> {
    SomeClass(tcp::socket&& s) : _stream(std::move(s), s_ctx) {}

    void start() {
        _stream.async_handshake(ssl::stream_base::server, [this, self = shared_from_this()](error_code ec) {
            std::cout << "Handshake: " << ec.message() << std::endl;
            if (!ec)
                read("\n");
        });
    }

  private:
    asio::streambuf                     _recBuf;
    ssl::stream<tcp::socket>            _stream;
    asio::strand<asio::any_io_executor> _readStrand{_stream.get_executor()};
    using String = std::string;

    void read(String const& delim) {
        asio::async_read_until(
            _stream, _recBuf, delim,
            bind_executor( //
                _readStrand, [this, delim, self = shared_from_this()](error_code const& ec, size_t bytes) {
                    if (!ec || (ec == asio::error::eof && bytes)) {
                        if (bytes <= delim.size()) {
                            return; // TODO review?!
                        }

                        String data(asio::buffers_begin(_recBuf.data()),
                                    asio::buffers_begin(_recBuf.data()) + (bytes - delim.size()));
                        _recBuf.consume(bytes);

                        std::cout << "Received:  " << quoted(data) << std::endl;
                        std::cout << "Available: " << _recBuf.size() << "/"
                                  << _stream.lowest_layer().available() << std::endl;

                        read(delim);
                    } else if (ec == asio::error::operation_aborted) {

                    } else {
                    }
                }));
    }
};

void accept_loop(tcp::acceptor& acc) {
    acc.async_accept(make_strand(acc.get_executor()), [&acc](error_code ec, tcp::socket s) {
        if (!ec) {
            std::cout << "Accepted from " << s.remote_endpoint() << std::endl;
            std::make_shared<SomeClass>(std::move(s))->start();
            accept_loop(acc);
        }
    });
}

int main() {
    s_ctx.set_default_verify_paths();
    s_ctx.use_private_key_file("server.pem", ssl::context::pem);
    s_ctx.use_certificate_file("server.pem", ssl::context::pem);

    asio::io_context ioc(1);
    tcp::acceptor acc(ioc, {{}, 6443});
    accept_loop(acc);

    ioc.run();
    ioc.run_for(30s);
}

使用 server.pem 和客户端

openssl s_client -connect 127.0.0.1:6443 <<< $'FIRST\nSECOND\nTHIRD\nFOURTH'

精确显示预期

Accepted from 127.0.0.1:40198
Handshake: Success
Received:  "FIRST"
Available: 20/0
Received:  "SECOND"
Available: 13/0
Received:  "THIRD"
Available: 7/0
Received:  "FOURTH"
Available: 0/0

任何剩下的批评都是

  • 字符串提取效率不是特别高
  • 这不如首先将套接字与链相关联(在我的代码中,我已经这样做了,所以会话类 () 中的整个链是多余的)bind_executorSomeClass
  • 事实上,你比较error_codes,特别是跨不同的类别 - 这是行不通的。相反,与或确实对应(如果可用)进行比较(请阅读此处了解更多信息:测试特定错误条件asio::error::operation_abortederror_condition)
  • 在以下情况下以静默方式中断读取循环(以及潜在的连接)。当客户端发送空消息时,这似乎“挂断”bytes <= delim.size()
  • 忘记处理部分成功(例如,在 EOF 上) - 在这种特殊情况下,您可能希望在截断消息之前检查分隔符是否存在。(我的代码还没有做这部分)

评论

1赞 bustus_primus 11/14/2023
嘿,非常感谢您的快速回复!
1赞 sehe 11/14/2023
不用担心!简化缓冲区实现的建议:coliru.stacked-crooked.com/a/930a1db874c8195f
0赞 bustus_primus 11/14/2023
感谢您的快速回复!我添加了一个 stop() 方法,如果消息 - delim 不够长,它将包装所有内容。我使用此代码实现了使用 STARTTLS 协议进行通信的成功 SMTPS 类。我还使用此代码为另一个应用程序实现自定义消息协议,似乎我使用自定义消息协议的测试用例是我唯一遇到此问题的时间。消息协议很简单 - 只是一个 JSON 字符串,后跟“^”作为分隔符。也许我的写入函数有问题?请回来查看我的问题。
0赞 bustus_primus 11/14/2023
另外,只是出于好奇,为什么你的缓冲区实现更好?
1赞 sehe 11/14/2023
这只会更容易。而且,它对未记录的实现细节做出的假设更少(请注意,如果缓冲区迭代器不指向连续内存,则代码是 UB 的,例如参见 stackoverflow.com/questions/72236319/...)。此外,这让我意识到我在分隔符检查方面很草率。这更好:coliru.stacked-crooked.com/a/85e123269c9c081a
1赞 sehe 11/23/2023
也是出于好奇,这个答案对你有帮助吗?