提问人:aj3423 提问时间:11/11/2019 更新时间:11/12/2019 访问量:325
将数据写入客户端时,Boost 协程服务器崩溃
boost coroutine server crashes when writting data to client
问:
我基于boost协程回显服务器示例制作了我的服务器,只是接收和写回一些数据。它在向客户端写入数据时崩溃,更奇怪的是,它仅在使用多个内核时崩溃。
这是服务器,它读取 4 个字节并写回“OK”,在 1 秒内作为超时:
#include <winsock2.h>
#include <windows.h>
#include <iostream>
using namespace std;
#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;
#define SERVER_PORT 1234
#define DATA_LEN_4 4
#define TIMEOUT_LIMIT 1 // second
struct session : public std::enable_shared_from_this<session>
{
tcp::socket socket_;
boost::asio::steady_timer timer_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
explicit session(boost::asio::io_context& io_context, tcp::socket socket)
: socket_(std::move(socket)),
timer_(io_context),
strand_(io_context.get_executor())
{ }
void go()
{
auto self(shared_from_this());
boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
{
try
{
timer_.expires_from_now(std::chrono::seconds(TIMEOUT_LIMIT));
// recv data
string packet;
packet.resize(DATA_LEN_4); // alloc memory
size_t received_len = 0;
// read data
{
size_t rs;
while(received_len < DATA_LEN_4) { // recv 4 bytes
boost::system::error_code ec;
rs = socket_.async_read_some(
boost::asio::buffer((char*)(packet.c_str()+received_len), DATA_LEN_4-received_len), yield[ec]);
if(ec==boost::asio::error::eof)
break; //connection closed cleanly by peer
else if(ec) {
throw "read_fail";
}
received_len += rs;
}
}
if(received_len < DATA_LEN_4) {
throw "recv too short, maybe timeout";
}
// write back "OK"
{
boost::system::error_code ecw;
boost::asio::async_write(socket_, boost::asio::buffer(string("OK")), yield[ecw]);
if(ecw==boost::asio::error::eof)
return; //connection closed cleanly by peer
else if(ecw)
throw "write_fail"; // some other error
}
}
catch (const char* reason)
{
printf("exception reason: %s\n", reason);
boost::system::error_code ecw;
/*
* Question 1: why this 'async_write' line causes crash?
*/
// write the error reason to client
boost::asio::async_write(socket_, boost::asio::buffer(string(reason)), yield[ecw]);
socket_.close();
timer_.cancel();
}
catch (...)
{
printf("unknown exception\n");
socket_.close();
timer_.cancel();
}
});
boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
{
while (socket_.is_open())
{
boost::system::error_code ignored_ec;
timer_.async_wait(yield[ignored_ec]);
if (timer_.expires_from_now() <= std::chrono::seconds(0))
socket_.close();
}
});
}
};
int main() {
boost::asio::io_context io_context;
boost::asio::spawn(io_context, [&](boost::asio::yield_context yield)
{
tcp::acceptor acceptor(io_context,
tcp::endpoint(tcp::v4(), SERVER_PORT));
for (;;)
{
boost::system::error_code ec;
tcp::socket socket(io_context);
acceptor.async_accept(socket, yield[ec]);
if (!ec)
std::make_shared<session>(io_context, std::move(socket))->go();
}
});
/*
* When run on 1 CPU, it runs fine, no Crash
*/
// io_context.run();
/*
* Question 2:
* But when run on multiple CPUs, it Crashes !!!
* Why?
*/
auto thread_count = std::thread::hardware_concurrency();
boost::thread_group tgroup;
for (auto i = 0; i < thread_count; ++i)
tgroup.create_thread(boost::bind(&boost::asio::io_context::run, &io_context));
tgroup.join_all();
}
请注意,4 字节数据包和 1 秒超时只是为了说明问题,真实服务器使用大数据包,这可能会导致网络状况不佳时超时。为了模拟这一点,客户端每秒写入 1 个字节以触发服务器上的读取超时。
客户:
#include <iostream>
#include <boost/asio.hpp>
using namespace std;
using boost::asio::ip::tcp;
#define SERVER "127.0.0.1"
#define PORT "1234"
int main() {
boost::asio::io_context io_context;
unsigned i = 1;
while(1) {
try {
tcp::socket s(io_context);
tcp::resolver resolver(io_context);
boost::asio::connect(s, resolver.resolve(SERVER, PORT));
// to simulate the bad network condition,
// write 4 bytes in 4 seconds to trigger the receive timeout on server, which is 1 second
for(int i=0; i<4; i++) {
boost::asio::write(s, boost::asio::buffer(string("A")));
std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1 second
}
// read echo
char x[64] = {0};
s.read_some(boost::asio::buffer(x, sizeof(x)));
cout << i++ << ". received: " << x << endl;
} catch (...) {
cout << i++ << " exception" << endl;
}
}
return 0;
}
问题 1:
为什么这条线会导致崩溃?
boost::asio::async_write(socket_, boost::asio::buffer(string(reason)), yield[ecw]);
问题 2:
为什么服务器在 1 个 cpu 上运行时不会崩溃:?
并使用 ?io_context.run();
thread_group
我的环境:Win10-64bit、boost-1.71.0-64bit、VisualStudio-2017-Community
答:
第1项质询
ba::async_write(socket_, ba::buffer(string("OK")), yield[ecw]);
这会调用未定义的行为,因为您传递了一个临时字符串作为缓冲区,但异步操作(根据定义)在调用返回之前不会完成。async_write
因此,缓冲区是对堆栈上已损坏的东西或现在存在于那里的任何东西的陈旧引用。
从逻辑上讲,发送缓冲区是对象的一部分,以获得更合适的生存期。或者,由于您正在执行协程,并且无论如何都要结束会话,因此只需使用 而不是 .self
write
async_write
第2项质询
因为未定义的行为是未定义的行为。任何事情都可能发生。
无人问津
而不是与 一起使用,或者与适当的完成条件一起使用。
read_some
read
transfer_exactly(DATA_LEN_4)
read_until
而不是你可以.
buffer(reserved_string)
dynamic_buffer
与其抛出神奇的字符串,不如抓住代码表示出现的情况:
system_error
try { timer_.expires_from_now(std::chrono::seconds(TIMEOUT_LIMIT)); // read data std::string packet; auto received_len = ba::async_read(socket_, ba::dynamic_buffer(packet), ba::transfer_exactly(DATA_LEN_4), yield); assert(received_len == DATA_LEN_4); // guaranteed // write back "OK" ba::write(socket_, ba::buffer("OK"s)); } catch (boost::system::system_error const& e) { if (e.code() == ba::error::operation_aborted) std::cout << "canceled (timeout)" << std::endl; else if (e.code() == ba::error::eof) std::cout << "eof" << std::endl; else throw std::runtime_error(e.code().message()); }
所以,现在你可以用你的通用异常处理块来包装它:
try { // ... } catch (std::exception const& e) { std::cout << "exception: " << std::quoted(e.what()) << std::endl; boost::system::error_code ignore; ba::async_write(socket_, ba::buffer(std::string(e.what())), yield[ignore]); socket_.close(); timer_.cancel(); }
但!
- 通知你的客户是有用的,甚至是明智的,这似乎是非常可疑的
- 无论如何,在 Coro 中没有捕获异常都会破坏实例,因此您可以简单地让它逃脱
self
定时器
时间完成已经表示计时器是过期还是取消:
error_code
while (socket_.is_open()) { boost::system::error_code ec; timer_.async_wait(yield[ec]); if (ba::error::operation_aborted != ec) // timer was not canceled socket_.close(); }
但请注意,会话 coro 的常规返回路径不会调用 .这将导致套接字再保持打开<1秒,直到计时器到期。
.cancel()
time_
异常
如果你想让异常从coros中逃脱(你可以,你应该认为它发生了),你必须通过处理异常来改进线程循环:boost::asio::io_service::run()抛出的异常是否应该被捕获?
建议的服务器代码
结合 coros,大大简化了所有条件处理:
#include <iostream>
#include <iomanip>
#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/scope_exit.hpp>
using namespace std::literals;
namespace ba = boost::asio;
using ba::ip::tcp;
static constexpr unsigned short SERVER_PORT = 1234;
static constexpr std::size_t DATA_LEN_4 = 4;
static constexpr auto TIMEOUT_LIMIT = 1s;
struct session : public std::enable_shared_from_this<session>
{
tcp::socket socket_;
ba::steady_timer timer_;
ba::strand<ba::io_context::executor_type> strand_;
explicit session(ba::io_context& io_context, tcp::socket socket)
: socket_(std::move(socket)),
timer_(io_context),
strand_(io_context.get_executor())
{ }
void go() {
ba::spawn(strand_, [this, self = shared_from_this()](ba::yield_context yield) {
spawn(yield, [this, self](ba::yield_context yield) {
timer_.expires_from_now(TIMEOUT_LIMIT);
while (socket_.is_open()) {
boost::system::error_code ec;
timer_.async_wait(yield[ec]);
if (ba::error::operation_aborted != ec) // timer was not canceled
socket_.close();
}
});
try {
// read data
std::string packet;
ba::async_read(socket_,
ba::dynamic_buffer(packet),
ba::transfer_exactly(DATA_LEN_4), yield);
// write back "OK"
ba::write(socket_, ba::buffer("OK"s));
}
catch (boost::system::system_error const& e) {
if (e.code() == ba::error::operation_aborted)
std::cout << "canceled (timeout)" << std::endl;
else if (e.code() == ba::error::eof)
std::cout << "eof" << std::endl;
else // throw std::runtime_error(e.code().message());
std::cout << "other: " << e.code().message() << std::endl;
}
socket_.close();
timer_.cancel(); // cancel the other coro so we don't keep the session alive
});
}
};
int main() {
ba::io_context io_context;
ba::spawn(io_context, [&](ba::yield_context yield) {
tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), SERVER_PORT));
for (;;) {
boost::system::error_code ec;
tcp::socket socket(io_context);
acceptor.async_accept(socket, yield[ec]);
if (!ec)
std::make_shared<session>(io_context, std::move(socket))->go();
}
});
boost::thread_group tgroup;
for (auto i = 0u; i < std::thread::hardware_concurrency(); ++i)
tgroup.create_thread([&io_context] {
for (;;) {
try { io_context.run(); break; } // exited normally
catch (std::exception const &e) { std::clog << "[eventloop] exception caught " << std::quoted(e.what()) << std::endl; }
catch (...) { std::clog << "[eventloop] unknown exception caught" << std::endl; }
}
});
tgroup.join_all();
}
使用随机客户端
将睡眠更改为随机,以便它有时有效,有时超时:
std::mt19937 prng { std::random_device{}() };
for (int i = 0; i < 4; i++) {
ba::write(s, ba::buffer(std::string("A")));
std::this_thread::sleep_for(std::uniform_int_distribution<>(200, 400)(prng) * 1ms);
}
打印在我的系统上:
1. received: OK
2. received: OK
3. received: OK
canceled (timeout)
4 exception read_some: End of file
5. received: OK
canceled (timeout)
6 exception read_some: End of file
7. received: OK
8. received: OK
看马,没有手
更简单一点,省略特殊情况的消息,实际上并没有太大变化:
ba::spawn(strand_, [this, self = shared_from_this()](ba::yield_context yield) {
try {
ba::steady_timer timer(strand_, TIMEOUT_LIMIT);
timer.async_wait([this](error_code ec) {
if (ba::error::operation_aborted != ec)
socket_.close();
});
std::string packet;
ba::async_read(socket_,
ba::dynamic_buffer(packet),
ba::transfer_exactly(DATA_LEN_4), yield);
ba::write(socket_, ba::buffer("OK"s));
} catch(std::exception const& e) {
std::clog << "error " << std::quoted(e.what()) << std::endl;
}
});
请注意,我们甚至不再需要作为成员及其析构函数
在到达结束时,也会自动正确取消计时器
范围。timer_
输出实际上并没有太大变化:
1. received: OK
2. received: OK
3. received: OK
error "Operation canceled"
4 exception read_some: End of file
5. received: OK
6. received: OK
7. received: OK
error "Operation canceled"
8 exception read_some: End of file
error "Operation canceled"
9 exception read_some: End of file
评论
asan
valgrind
io_context.run()
评论