提问人:bustus_primus 提问时间:11/14/2023 最后编辑:bustus_primus 更新时间:11/14/2023 访问量:39
Boost ASIO async_read_until无法读取背靠背消息的问题
Issue with Boost ASIO async_read_until not reading back to back messages
问:
我正在使用 boost.asio 实现一个网络库。这是 TCP 类的读取函数。
我的一个测试用例有一个客户端连接到服务器,一旦握手完成,服务器会立即与分隔符背靠背发送两条消息。客户端读取第一条消息,但不读取第二条消息。如果稍后发送第三条消息,它将拾取第二条错过的消息。bytes 变量仅是一条消息的长度。这里有什么明显的问题吗?
(我删除了不相关的代码)
boost::asio::streambuf _recBuf;
ssl::stream<ip::tcp::socket&> _stream;
void SomeClass::read(const String &delim) {
boost::asio::async_read_until(_stream, _recBuf, delim,
boost::asio::bind_executor(_readStrand,
[this, delim, self = shared_from_this()] (const boost::system::error_code& ec, const std::size_t bytes) {
if (!ec) {
if (bytes <= delim.size()) {
return;
}
String data(boost::asio::buffers_begin(_recBuf.data()), boost::asio::buffers_begin(_recBuf.data()) + (bytes - delim.size()));
_recBuf.consume(bytes);
read(delim);
}
else if (ec == system::errc::operation_canceled) {
}
else {
}
}));
}
我试图使用 commit() 方法和 istream 改变我阅读_recBuf的方式。
在 _stream.lowest_layer() 上调用 available() 方法以查看客户端上有多少字节可用,等待两条背靠背消息,在读取初始消息后返回 0。
编辑
这是 write 方法。也许这可能会导致问题?
void TCPSBase::write(const Message &msg) {
String data = msg.toJsonStr() + '^';
asio::async_write(_stream, asio::buffer(data),
asio::bind_executor(_writeStrand,
[this, data, self = shared_from_this()] (const boost::system::error_code& ec, const std::size_t bytes) {
LogHandler lh(Logger::getInstance(), "TCPSBase::write::lambda");
if (!ec) {
lh.d("wrote " + std::to_string(bytes) + " bytes");
lh.d(data);
}
else if (ec == system::errc::operation_canceled) {
lh.d("canceled");
}
else {
lh.e("disconnecting after error during write: " + ec.message());
stop();
}
}));
}
答:
0赞
sehe
11/14/2023
#1
在 _stream.lowest_layer() 上调用 available() 方法以查看可用的字节数会导致错误的文件描述符异常。
这意味着套接字变得无效,例如因为它已关闭。
为了回顾问题的其余部分,我使用最少的代码使您的代码自包含:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <iostream>
namespace asio = boost::asio;
namespace ssl = asio::ssl;
using namespace std::chrono_literals;
using error_code = boost::system::error_code;
using asio::ip::tcp;
static ssl::context s_ctx{ssl::context::sslv23_server};
struct SomeClass : std::enable_shared_from_this<SomeClass> {
SomeClass(tcp::socket&& s) : _stream(std::move(s), s_ctx) {}
void start() {
_stream.async_handshake(ssl::stream_base::server, [this, self = shared_from_this()](error_code ec) {
std::cout << "Handshake: " << ec.message() << std::endl;
if (!ec)
read("\n");
});
}
private:
asio::streambuf _recBuf;
ssl::stream<tcp::socket> _stream;
asio::strand<asio::any_io_executor> _readStrand{_stream.get_executor()};
using String = std::string;
void read(String const& delim) {
asio::async_read_until(
_stream, _recBuf, delim,
bind_executor( //
_readStrand, [this, delim, self = shared_from_this()](error_code const& ec, size_t bytes) {
if (!ec || (ec == asio::error::eof && bytes)) {
if (bytes <= delim.size()) {
return; // TODO review?!
}
String data(asio::buffers_begin(_recBuf.data()),
asio::buffers_begin(_recBuf.data()) + (bytes - delim.size()));
_recBuf.consume(bytes);
std::cout << "Received: " << quoted(data) << std::endl;
std::cout << "Available: " << _recBuf.size() << "/"
<< _stream.lowest_layer().available() << std::endl;
read(delim);
} else if (ec == asio::error::operation_aborted) {
} else {
}
}));
}
};
void accept_loop(tcp::acceptor& acc) {
acc.async_accept(make_strand(acc.get_executor()), [&acc](error_code ec, tcp::socket s) {
if (!ec) {
std::cout << "Accepted from " << s.remote_endpoint() << std::endl;
std::make_shared<SomeClass>(std::move(s))->start();
accept_loop(acc);
}
});
}
int main() {
s_ctx.set_default_verify_paths();
s_ctx.use_private_key_file("server.pem", ssl::context::pem);
s_ctx.use_certificate_file("server.pem", ssl::context::pem);
asio::io_context ioc(1);
tcp::acceptor acc(ioc, {{}, 6443});
accept_loop(acc);
ioc.run();
ioc.run_for(30s);
}
使用 server.pem
和客户端
openssl s_client -connect 127.0.0.1:6443 <<< $'FIRST\nSECOND\nTHIRD\nFOURTH'
精确显示预期
Accepted from 127.0.0.1:40198
Handshake: Success
Received: "FIRST"
Available: 20/0
Received: "SECOND"
Available: 13/0
Received: "THIRD"
Available: 7/0
Received: "FOURTH"
Available: 0/0
任何剩下的批评都是
- 字符串提取效率不是特别高
- 这不如首先将套接字与链相关联(在我的代码中,我已经这样做了,所以会话类 () 中的整个链是多余的)
bind_executor
SomeClass
- 事实上,你比较error_codes,特别是跨不同的类别 - 这是行不通的。相反,与或确实对应(如果可用)进行比较(请阅读此处了解更多信息:测试特定错误条件
asio::error::operation_aborted
error_condition
) - 在以下情况下以静默方式中断读取循环(以及潜在的连接)。当客户端发送空消息时,这似乎“挂断”
bytes <= delim.size()
- 忘记处理部分成功(例如,在 EOF 上) - 在这种特殊情况下,您可能希望在截断消息之前检查分隔符是否存在。(我的代码还没有做这部分)
评论
1赞
bustus_primus
11/14/2023
嘿,非常感谢您的快速回复!
1赞
sehe
11/14/2023
不用担心!简化缓冲区实现的建议:coliru.stacked-crooked.com/a/930a1db874c8195f
0赞
bustus_primus
11/14/2023
感谢您的快速回复!我添加了一个 stop() 方法,如果消息 - delim 不够长,它将包装所有内容。我使用此代码实现了使用 STARTTLS 协议进行通信的成功 SMTPS 类。我还使用此代码为另一个应用程序实现自定义消息协议,似乎我使用自定义消息协议的测试用例是我唯一遇到此问题的时间。消息协议很简单 - 只是一个 JSON 字符串,后跟“^”作为分隔符。也许我的写入函数有问题?请回来查看我的问题。
0赞
bustus_primus
11/14/2023
另外,只是出于好奇,为什么你的缓冲区实现更好?
1赞
sehe
11/14/2023
这只会更容易。而且,它对未记录的实现细节做出的假设更少(请注意,如果缓冲区迭代器不指向连续内存,则代码是 UB 的,例如参见 stackoverflow.com/questions/72236319/...)。此外,这让我意识到我在分隔符检查方面很草率。这更好:coliru.stacked-crooked.com/a/85e123269c9c081a
1赞
sehe
11/23/2023
也是出于好奇,这个答案对你有帮助吗?
评论