将数据写入客户端时,Boost 协程服务器崩溃

boost coroutine server crashes when writting data to client

提问人:aj3423 提问时间:11/11/2019 更新时间:11/12/2019 访问量:325

问:

我基于boost协程回显服务器示例制作了我的服务器,只是接收和写回一些数据。它在向客户端写入数据时崩溃,更奇怪的是,它仅在使用多个内核时崩溃。

这是服务器,它读取 4 个字节并写回“OK”,在 1 秒内作为超时:

#include <winsock2.h>
#include <windows.h>

#include <iostream>
using namespace std;

#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;

#define SERVER_PORT 1234
#define DATA_LEN_4 4

#define TIMEOUT_LIMIT 1 // second

struct session : public std::enable_shared_from_this<session>
{
    tcp::socket socket_;
    boost::asio::steady_timer timer_;
    boost::asio::strand<boost::asio::io_context::executor_type> strand_;

    explicit session(boost::asio::io_context& io_context, tcp::socket socket)
    : socket_(std::move(socket)),
      timer_(io_context),
      strand_(io_context.get_executor())
    { }

    void go()
    {
        auto self(shared_from_this());
        boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
        {
            try
            {
                timer_.expires_from_now(std::chrono::seconds(TIMEOUT_LIMIT));

                // recv data
                string packet;
                packet.resize(DATA_LEN_4); // alloc memory

                size_t received_len = 0;

                // read data
                {
                    size_t rs;
                    while(received_len < DATA_LEN_4) { // recv 4 bytes
                        boost::system::error_code ec;

                        rs = socket_.async_read_some(
                            boost::asio::buffer((char*)(packet.c_str()+received_len), DATA_LEN_4-received_len), yield[ec]);
                        if(ec==boost::asio::error::eof)
                            break; //connection closed cleanly by peer
                        else if(ec) {
                            throw "read_fail";
                        }
                        received_len += rs;
                    }
                }
                if(received_len < DATA_LEN_4) {
                    throw "recv too short, maybe timeout";
                }
                // write back "OK"
                {
                    boost::system::error_code ecw;
                    boost::asio::async_write(socket_, boost::asio::buffer(string("OK")), yield[ecw]);
                    if(ecw==boost::asio::error::eof)
                        return; //connection closed cleanly by peer
                    else if(ecw)
                        throw "write_fail"; // some other error
                }
            }
            catch (const char* reason) 
            {
                printf("exception reason: %s\n", reason);
                boost::system::error_code ecw;

                /*
                 * Question 1: why this 'async_write' line causes crash?
                 */
                // write the error reason to client
                boost::asio::async_write(socket_, boost::asio::buffer(string(reason)), yield[ecw]);

                socket_.close();
                timer_.cancel();
            }
            catch (...)
            {
                printf("unknown exception\n");
                socket_.close();
                timer_.cancel();
            }
        });

        boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
        {
            while (socket_.is_open())
            {
                boost::system::error_code ignored_ec;
                timer_.async_wait(yield[ignored_ec]);
                if (timer_.expires_from_now() <= std::chrono::seconds(0))
                    socket_.close();
            }
        });
    }
};

int main() {
    boost::asio::io_context io_context;

    boost::asio::spawn(io_context, [&](boost::asio::yield_context yield)
    {
        tcp::acceptor acceptor(io_context,
        tcp::endpoint(tcp::v4(), SERVER_PORT));

        for (;;)
        {
            boost::system::error_code ec;

            tcp::socket socket(io_context);
            acceptor.async_accept(socket, yield[ec]);
            if (!ec) 
                std::make_shared<session>(io_context, std::move(socket))->go();
        }
    });

    /*
     * When run on 1 CPU, it runs fine, no Crash 
     */
    // io_context.run();

    /*
     * Question 2:
     * But when run on multiple CPUs, it Crashes !!!
     * Why?
     */
    auto thread_count = std::thread::hardware_concurrency();
    boost::thread_group tgroup;
    for (auto i = 0; i < thread_count; ++i)
        tgroup.create_thread(boost::bind(&boost::asio::io_context::run, &io_context));
    tgroup.join_all();
}

请注意,4 字节数据包和 1 秒超时只是为了说明问题,真实服务器使用大数据包,这可能会导致网络状况不佳时超时。为了模拟这一点,客户端每秒写入 1 个字节以触发服务器上的读取超时。

客户:

#include <iostream>
#include <boost/asio.hpp>
using namespace std;

using boost::asio::ip::tcp;

#define SERVER "127.0.0.1"
#define PORT "1234"

int main() {
    boost::asio::io_context io_context;

    unsigned i = 1; 
    while(1) {
        try {
            tcp::socket s(io_context);
            tcp::resolver resolver(io_context);
            boost::asio::connect(s, resolver.resolve(SERVER, PORT));

            // to simulate the bad network condition,
            // write 4 bytes in 4 seconds to trigger the receive timeout on server, which is 1 second
            for(int i=0; i<4; i++) { 
                boost::asio::write(s, boost::asio::buffer(string("A")));
                std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1 second
            }

            // read echo
            char x[64] = {0};
            s.read_some(boost::asio::buffer(x, sizeof(x)));
            cout << i++ << ". received: " << x << endl;
        } catch (...) {
            cout << i++ << " exception" << endl;
        }
    }

    return 0;
}

问题 1

为什么这条线会导致崩溃?

boost::asio::async_write(socket_, boost::asio::buffer(string(reason)), yield[ecw]);

问题 2

为什么服务器在 1 个 cpu 上运行时不会崩溃:?
并使用 ?
io_context.run();thread_group

我的环境:Win10-64bit、boost-1.71.0-64bit、VisualStudio-2017-Community

升压 -ASIO -协程升压 -异常

评论


答:

3赞 sehe 11/12/2019 #1

第1项质询

ba::async_write(socket_, ba::buffer(string("OK")), yield[ecw]);

这会调用未定义的行为,因为您传递了一个临时字符串作为缓冲区,但异步操作(根据定义)在调用返回之前不会完成。async_write

因此,缓冲区是对堆栈上已损坏的东西或现在存在于那里的任何东西的陈旧引用。

从逻辑上讲,发送缓冲区是对象的一部分,以获得更合适的生存期。或者,由于您正在执行协程,并且无论如何都要结束会话,因此只需使用 而不是 .selfwriteasync_write

第2项质询

因为未定义的行为是未定义的行为任何事情都可能发生

无人问津

  • 而不是与 一起使用,或者与适当的完成条件一起使用。read_somereadtransfer_exactly(DATA_LEN_4)read_until

  • 而不是你可以.buffer(reserved_string)dynamic_buffer

  • 与其抛出神奇的字符串,不如抓住代码表示出现的情况:system_error

    try {
        timer_.expires_from_now(std::chrono::seconds(TIMEOUT_LIMIT));
    
        // read data
        std::string packet;
        auto received_len = ba::async_read(socket_,
                ba::dynamic_buffer(packet),
                ba::transfer_exactly(DATA_LEN_4), yield);
    
        assert(received_len == DATA_LEN_4); // guaranteed
    
        // write back "OK"
        ba::write(socket_, ba::buffer("OK"s));
    }
    catch (boost::system::system_error const& e) {
        if (e.code() == ba::error::operation_aborted)
            std::cout << "canceled (timeout)" << std::endl;
        else if (e.code() == ba::error::eof)
            std::cout << "eof" << std::endl;
        else throw std::runtime_error(e.code().message());
    }
    
  • 所以,现在你可以用你的通用异常处理块来包装它:

    try {
        // ...
    } catch (std::exception const& e) {
        std::cout << "exception: " << std::quoted(e.what()) << std::endl;
    
        boost::system::error_code ignore;
        ba::async_write(socket_, ba::buffer(std::string(e.what())), yield[ignore]);
    
        socket_.close();
        timer_.cancel();
    }
    

    但!

    1. 通知你的客户是有用的,甚至是明智的,这似乎是非常可疑的
    2. 无论如何,在 Coro 中没有捕获异常都会破坏实例,因此您可以简单地让它逃脱self

定时器

  • 时间完成已经表示计时器是过期还是取消:error_code

    while (socket_.is_open()) {
        boost::system::error_code ec;
        timer_.async_wait(yield[ec]);
    
        if (ba::error::operation_aborted != ec) // timer was not canceled
            socket_.close();
    }
    
  • 但请注意,会话 coro 的常规返回路径不会调用 .这将导致套接字再保持打开<1秒,直到计时器到期。.cancel()time_

异常

如果你想让异常从coros中逃脱(你可以,你应该认为它发生了),你必须通过处理异常来改进线程循环:boost::asio::io_service::run()抛出的异常是否应该被捕获?

建议的服务器代码

结合 coros,大大简化了所有条件处理:

#include <iostream>
#include <iomanip>

#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/scope_exit.hpp>

using namespace std::literals;
namespace ba = boost::asio;
using ba::ip::tcp;

static constexpr unsigned short SERVER_PORT = 1234;
static constexpr std::size_t    DATA_LEN_4 = 4;
static constexpr auto           TIMEOUT_LIMIT = 1s;

struct session : public std::enable_shared_from_this<session>
{
    tcp::socket socket_;
    ba::steady_timer timer_;
    ba::strand<ba::io_context::executor_type> strand_;

    explicit session(ba::io_context& io_context, tcp::socket socket)
    : socket_(std::move(socket)),
      timer_(io_context),
      strand_(io_context.get_executor())
    { }

    void go() {
        ba::spawn(strand_, [this, self = shared_from_this()](ba::yield_context yield) {

            spawn(yield, [this, self](ba::yield_context yield) {
                timer_.expires_from_now(TIMEOUT_LIMIT);
                while (socket_.is_open()) {
                    boost::system::error_code ec;
                    timer_.async_wait(yield[ec]);
                    if (ba::error::operation_aborted != ec) // timer was not canceled
                        socket_.close();
                }
            });

            try {
                // read data
                std::string packet;
                ba::async_read(socket_,
                        ba::dynamic_buffer(packet),
                        ba::transfer_exactly(DATA_LEN_4), yield);

                // write back "OK"
                ba::write(socket_, ba::buffer("OK"s));
            }
            catch (boost::system::system_error const& e) {
                if (e.code() == ba::error::operation_aborted)
                    std::cout << "canceled (timeout)" << std::endl;
                else if (e.code() == ba::error::eof)
                    std::cout << "eof" << std::endl;
                else // throw std::runtime_error(e.code().message());
                    std::cout << "other: " << e.code().message() << std::endl;
            }

            socket_.close();
            timer_.cancel(); // cancel the other coro so we don't keep the session alive
        });
    }
};

int main() {
    ba::io_context io_context;

    ba::spawn(io_context, [&](ba::yield_context yield) {
        tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), SERVER_PORT));

        for (;;) {
            boost::system::error_code ec;

            tcp::socket socket(io_context);
            acceptor.async_accept(socket, yield[ec]);
            if (!ec) 
                std::make_shared<session>(io_context, std::move(socket))->go();
        }
    });

    boost::thread_group tgroup;
    for (auto i = 0u; i < std::thread::hardware_concurrency(); ++i)
        tgroup.create_thread([&io_context] {
            for (;;) {
                try { io_context.run(); break; } // exited normally
                catch (std::exception const &e) { std::clog << "[eventloop] exception caught " << std::quoted(e.what()) << std::endl; } 
                catch (...)                     { std::clog << "[eventloop] unknown exception caught" << std::endl;                   } 
            }
        });

    tgroup.join_all();
}

使用随机客户端

将睡眠更改为随机,以便它有时有效,有时超时:

std::mt19937 prng { std::random_device{}() };
for (int i = 0; i < 4; i++) {
    ba::write(s, ba::buffer(std::string("A")));
    std::this_thread::sleep_for(std::uniform_int_distribution<>(200, 400)(prng) * 1ms);
}

打印在我的系统上:

1. received: OK
2. received: OK
3. received: OK
canceled (timeout)
4 exception read_some: End of file
5. received: OK
canceled (timeout)
6 exception read_some: End of file
7. received: OK
8. received: OK

看马,没有手

更简单一点,省略特殊情况的消息,实际上并没有太大变化:

ba::spawn(strand_, [this, self = shared_from_this()](ba::yield_context yield) {
    try {
        ba::steady_timer timer(strand_, TIMEOUT_LIMIT);
        timer.async_wait([this](error_code ec) {
            if (ba::error::operation_aborted != ec) 
                socket_.close();
            });

        std::string packet;
        ba::async_read(socket_,
                ba::dynamic_buffer(packet),
                ba::transfer_exactly(DATA_LEN_4), yield);

        ba::write(socket_, ba::buffer("OK"s));
    } catch(std::exception const& e) {
        std::clog << "error " << std::quoted(e.what()) << std::endl;
    }
});

请注意,我们甚至不再需要作为成员及其析构函数 在到达结束时,也会自动正确取消计时器 范围。timer_

输出实际上并没有太大变化:

1. received: OK
2. received: OK
3. received: OK
error "Operation canceled"
4 exception read_some: End of file
5. received: OK
6. received: OK
7. received: OK
error "Operation canceled"
8 exception read_some: End of file
error "Operation canceled"
9 exception read_some: End of file

评论

0赞 sehe 11/12/2019
好的,我在这里找到了我自己的旧 stackoverflow.com/a/47954463/85371 和一个相关的线程 boost.2283326.n4.nabble.com/...... - 因此,在 Boost Asio 开始默认使用 Boost Coroutine2 之前,我将更改我的建议,以避免让异常从协程中逸出。
0赞 sehe 11/12/2019
那天晚上晚些时候:我睡不着觉,想知道我对UB(问题1)的分析是否有意义。毕竟,变量应该保留在 coro 堆栈上,直到 async_write 返回,而完成令牌是 coro 的收益上下文......也许它实际上应该没问题。嗯。这仍然是我想检查的东西。但是不喜欢堆栈切换。您可能会在启用支持的情况下编译 Boost Coroutine/Context 并使用 valgrind 来获得一些见解?asanvalgrind
0赞 aj3423 11/14/2019
你真棒!事实上,它不是由临时字符串引起的,即使使用全局静态字符串,它也会崩溃。真正的问题是可以抛出异常,需要捕获。建议的代码工作正常,也许应该将其添加到官方 boost 示例中。“未问”部分大大简化了代码。谢谢。io_context.run()