Boost asio 多线程应用序列化定时器回调

Boost asio multi threaded application serializes the timer callback

提问人:root 提问时间:8/24/2023 最后编辑:root 更新时间:8/24/2023 访问量:50

问:

我有以下应用程序在 linux 上运行,带有 boost 库版本 1.79。 该代码基于从此处获取的升压示例

void worker(io_context& ioContext) {
    std::cerr << "Worker thread started." << std::endl;
    ioContext.run(); // Run the io_context to handle asynchronous operations
    std::cerr << "Worker thread stopped." << std::endl;
}

int main() {
    try {
        io_context ioContext;

        // Create a work object to prevent ioContext.run() from returning immediately
        io_context::work work(ioContext);

        // Create multiple worker threads
        std::vector<std::thread> threads;
        for (int i = 0; i < 2; ++i) {
            threads.emplace_back(worker, std::ref(ioContext));
        }


        boost::asio::steady_timer timer(ioContext, std::chrono::seconds(5));

        // Set a lambda function as the callback when the timer expires
        timer.async_wait(
            [](const boost::system::error_code& error) {
                if (error)
                {
                    std::cerr << "async_wait failed: "
                                        << error.message();
                    return;
                }
                std::cerr << "Sleep Start Task1 executed in thread: " << std::this_thread::get_id() << std::endl;
                sleep(60);
            });

        boost::asio::steady_timer timer1(ioContext, std::chrono::seconds(10));

        // Set a lambda function as the callback when the timer expires
        timer1.async_wait(
            [](const boost::system::error_code& error) {
                if (error)
                {
                    std::cerr << "async_wait failed: "
                                        << error.message();
                    return;
                }
                std::cerr << " Start Task2 executed in thread: " << std::this_thread::get_id() << std::endl;
            });


        std::cerr << "App started :" << std::thread::hardware_concurrency() << std::endl;
        // Join the worker threads
        for (auto& thread : threads) {
            thread.join();
        }

        std::cerr << "All worker threads joined." << std::endl;
    } catch (const std::exception& e) {
        std::cerr << "Exception: " << e.what() << std::endl;
    }

    return 0;
}

我期望这两个线程可以同时执行计时器回调。

我的期望是

线程 1 - 将在 5 秒后调用计时器 1 回调并休眠 60 秒(阻止调用)

线程 2 - 将在 10 秒后调用 timer2 回调

当我运行代码时,结果似乎表明所有计时器回调都在单个线程上序列化。

5 秒后 在线程中执行的睡眠启动任务:1991504768

60 秒后 在线程中执行的启动任务:1991504768

我无法理解为什么当有两个线程运行ioContext.run()时,计时器回调被序列化;

我做错了什么吗?任何其他反馈都会有所帮助。

C++ 多线程 计时器 IO boost-ASIO

评论


答:

0赞 sehe 8/24/2023 #1

你正在你的处理程序中做一个。这将阻塞服务线程,干扰调度。sleep(60)

无论如何,我无法重现该问题。在此过程中添加一些测量值(并减少时间间隔以更好地适应我的耐心水平):

Live On Coliru(科里鲁生活公寓)

#include <boost/asio.hpp>
#include <iostream>
#include <iomanip>

namespace asio = boost::asio;
using namespace std::chrono_literals;
using boost::system::error_code;

static std::atomic_int tid_gen = 0;
thread_local int const tid     = [] { return ++tid_gen; }();
static constexpr auto  now = std::chrono::steady_clock::now;
static auto const      start   = now();
static std::mutex console_mx;

void trace(auto const&... msg) {
    std::lock_guard lk(console_mx);
    std::cerr << "at " << std::setw(8) << (now() - start)/1ms << "ms - tid:" << tid << " ";
    (std::cerr << ... << msg) << std::endl;
}

void worker(asio::io_context& ioContext) {
    trace("Worker thread enter");
    ioContext.run(); // Run the io_context to handle asynchronous operations
    trace("Worker thread exit");
}

int main() {
    try {
        asio::io_context ioContext;

        // Create a work object to prevent ioContext.run() from returning immediately
        auto work = make_work_guard(ioContext);

        // Create multiple worker threads
        std::vector<std::thread> threads;
        for (int i = 0; i < 2; ++i) {
            threads.emplace_back(worker, std::ref(ioContext));
        }

        asio::steady_timer task1(ioContext, 50ms);
        asio::steady_timer task2(ioContext, 100ms);

        task1.async_wait([](error_code ec) {
            trace("Start Task1: ", ec.message());
            if (!ec)
                sleep(6);
        });

        task2.async_wait([](error_code ec) { trace("Start Task2: ", ec.message()); });

        trace("App started :", std::thread::hardware_concurrency());

        work.reset();
        // Join the worker threads
        for (auto& thread : threads) {
            thread.join();
        }

        trace("All worker threads joined.");
    } catch (std::exception const& e) {
        trace("Exception: ", std::quoted(e.what()));
    }
}

印刷

at        0ms - tid:1 Worker thread enter
at        0ms - tid:2 Worker thread enter
at        0ms - tid:3 App started :4
at       50ms - tid:1 Start Task1: Success
at      100ms - tid:2 Start Task2: Success
at     6050ms - tid:1 Worker thread exit
at     6050ms - tid:2 Worker thread exit
at     6050ms - tid:3 All worker threads joined.

这正是我所期望的

旁注:使用并实际获得一个基于硬件并发性自动调整大小的池,取消工作防护和其他简化:thread_pool

Live On Coliru(科里鲁生活公寓)

#include <boost/asio.hpp>
#include <iostream>
#include <iomanip>

namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace std::placeholders;
using boost::system::error_code;

static std::atomic_int tid_gen = 0;
thread_local int const tid     = [] { return ++tid_gen; }();

using duration = std::chrono::steady_clock::duration;
static constexpr auto now   = std::chrono::steady_clock::now;
static auto const     start = now();
static std::mutex     console_mx;

void trace(auto const&... msg) {
    std::lock_guard lk(console_mx);
    std::cerr << "at " << std::setw(8) << (now() - start) / 1ms << "ms - tid:" << tid << " ";
    (std::cerr << ... << msg) << std::endl;
}

void do_task(error_code ec, std::string caption, duration block_for = {}) {
    trace("Start ", caption, ": ", ec.message());
    if (!ec)
        std::this_thread::sleep_for(block_for);
}

int main() {
    asio::thread_pool io;

    asio::steady_timer task1(io, 50ms);
    asio::steady_timer task2(io, 100ms);

    task1.async_wait(bind(do_task, _1, "Task1", 6s));
    task2.async_wait(bind(do_task, _1, "Task2", 0s));

    trace("App started: ", std::thread::hardware_concurrency());

    io.join();
    trace("Done");
}

指纹

at        0ms - tid:1 App started: 4
at       50ms - tid:2 Start Task1: Success
at      100ms - tid:3 Start Task2: Success
at     6051ms - tid:1 Done

评论

0赞 root 8/24/2023
感谢您的回复。我尝试了您的代码,但得到了不同的结果。和我原来的问题一样。在 1ms - tid:1 应用程序启动:2 在 30ms 时 - tid:2 在 30ms 时进入工作线程 - tid:3 在 50ms 时进入工作线程 - tid:2 启动任务 1:在 6050ms 时成功 - tid:2 启动任务 2:在 6051ms 时成功 - tid:2 工作线程退出
0赞 root 8/24/2023
我的硬件是双核 ARM Cortex A7,运行 Linux 5.15。我想知道它是否与提升库版本有关。我的是 1.79
0赞 sehe 8/24/2023
即使使用 1.79 我也无法重现,请参阅编译器资源管理器和 ARM。最显着的区别是您的线程需要高达 30 毫秒才能启动。我建议与启动前插入 50 毫秒的睡眠进行比较,看看这是否会改变调度。thread_poolasync_wait
0赞 sehe 8/24/2023
考虑粘贴任何结果以使其易于阅读,例如 paste.ubuntu.com/p/kfFS7jJth9 来自您的评论
0赞 root 8/25/2023
我试过thread_pool,在async_wait之前延迟,行为没有变化。只有一个工作线程正在获取两个处理程序并对其进行序列化,而另一个工作线程似乎挂起。我每次都必须杀死应用程序。有趣的观察是,如果我以 0 毫秒同时启动两个计时器任务,那么我可以看到两个线程都拾取了处理程序并且它们都正确退出(即使在两个处理程序中都阻止了睡眠调用)。