提问人:root 提问时间:8/24/2023 最后编辑:root 更新时间:8/24/2023 访问量:50
Boost asio 多线程应用序列化定时器回调
Boost asio multi threaded application serializes the timer callback
问:
我有以下应用程序在 linux 上运行,带有 boost 库版本 1.79。 该代码基于从此处获取的升压示例
void worker(io_context& ioContext) {
std::cerr << "Worker thread started." << std::endl;
ioContext.run(); // Run the io_context to handle asynchronous operations
std::cerr << "Worker thread stopped." << std::endl;
}
int main() {
try {
io_context ioContext;
// Create a work object to prevent ioContext.run() from returning immediately
io_context::work work(ioContext);
// Create multiple worker threads
std::vector<std::thread> threads;
for (int i = 0; i < 2; ++i) {
threads.emplace_back(worker, std::ref(ioContext));
}
boost::asio::steady_timer timer(ioContext, std::chrono::seconds(5));
// Set a lambda function as the callback when the timer expires
timer.async_wait(
[](const boost::system::error_code& error) {
if (error)
{
std::cerr << "async_wait failed: "
<< error.message();
return;
}
std::cerr << "Sleep Start Task1 executed in thread: " << std::this_thread::get_id() << std::endl;
sleep(60);
});
boost::asio::steady_timer timer1(ioContext, std::chrono::seconds(10));
// Set a lambda function as the callback when the timer expires
timer1.async_wait(
[](const boost::system::error_code& error) {
if (error)
{
std::cerr << "async_wait failed: "
<< error.message();
return;
}
std::cerr << " Start Task2 executed in thread: " << std::this_thread::get_id() << std::endl;
});
std::cerr << "App started :" << std::thread::hardware_concurrency() << std::endl;
// Join the worker threads
for (auto& thread : threads) {
thread.join();
}
std::cerr << "All worker threads joined." << std::endl;
} catch (const std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}
我期望这两个线程可以同时执行计时器回调。
我的期望是
线程 1 - 将在 5 秒后调用计时器 1 回调并休眠 60 秒(阻止调用)
线程 2 - 将在 10 秒后调用 timer2 回调
当我运行代码时,结果似乎表明所有计时器回调都在单个线程上序列化。
5 秒后 在线程中执行的睡眠启动任务:1991504768
60 秒后 在线程中执行的启动任务:1991504768
我无法理解为什么当有两个线程运行ioContext.run()时,计时器回调被序列化;
我做错了什么吗?任何其他反馈都会有所帮助。
答:
0赞
sehe
8/24/2023
#1
你正在你的处理程序中做一个。这将阻塞服务线程,干扰调度。sleep(60)
无论如何,我无法重现该问题。在此过程中添加一些测量值(并减少时间间隔以更好地适应我的耐心水平):
#include <boost/asio.hpp>
#include <iostream>
#include <iomanip>
namespace asio = boost::asio;
using namespace std::chrono_literals;
using boost::system::error_code;
static std::atomic_int tid_gen = 0;
thread_local int const tid = [] { return ++tid_gen; }();
static constexpr auto now = std::chrono::steady_clock::now;
static auto const start = now();
static std::mutex console_mx;
void trace(auto const&... msg) {
std::lock_guard lk(console_mx);
std::cerr << "at " << std::setw(8) << (now() - start)/1ms << "ms - tid:" << tid << " ";
(std::cerr << ... << msg) << std::endl;
}
void worker(asio::io_context& ioContext) {
trace("Worker thread enter");
ioContext.run(); // Run the io_context to handle asynchronous operations
trace("Worker thread exit");
}
int main() {
try {
asio::io_context ioContext;
// Create a work object to prevent ioContext.run() from returning immediately
auto work = make_work_guard(ioContext);
// Create multiple worker threads
std::vector<std::thread> threads;
for (int i = 0; i < 2; ++i) {
threads.emplace_back(worker, std::ref(ioContext));
}
asio::steady_timer task1(ioContext, 50ms);
asio::steady_timer task2(ioContext, 100ms);
task1.async_wait([](error_code ec) {
trace("Start Task1: ", ec.message());
if (!ec)
sleep(6);
});
task2.async_wait([](error_code ec) { trace("Start Task2: ", ec.message()); });
trace("App started :", std::thread::hardware_concurrency());
work.reset();
// Join the worker threads
for (auto& thread : threads) {
thread.join();
}
trace("All worker threads joined.");
} catch (std::exception const& e) {
trace("Exception: ", std::quoted(e.what()));
}
}
印刷
at 0ms - tid:1 Worker thread enter
at 0ms - tid:2 Worker thread enter
at 0ms - tid:3 App started :4
at 50ms - tid:1 Start Task1: Success
at 100ms - tid:2 Start Task2: Success
at 6050ms - tid:1 Worker thread exit
at 6050ms - tid:2 Worker thread exit
at 6050ms - tid:3 All worker threads joined.
这正是我所期望的
旁注:使用并实际获得一个基于硬件并发性自动调整大小的池,取消工作防护和其他简化:thread_pool
#include <boost/asio.hpp>
#include <iostream>
#include <iomanip>
namespace asio = boost::asio;
using namespace std::chrono_literals;
using namespace std::placeholders;
using boost::system::error_code;
static std::atomic_int tid_gen = 0;
thread_local int const tid = [] { return ++tid_gen; }();
using duration = std::chrono::steady_clock::duration;
static constexpr auto now = std::chrono::steady_clock::now;
static auto const start = now();
static std::mutex console_mx;
void trace(auto const&... msg) {
std::lock_guard lk(console_mx);
std::cerr << "at " << std::setw(8) << (now() - start) / 1ms << "ms - tid:" << tid << " ";
(std::cerr << ... << msg) << std::endl;
}
void do_task(error_code ec, std::string caption, duration block_for = {}) {
trace("Start ", caption, ": ", ec.message());
if (!ec)
std::this_thread::sleep_for(block_for);
}
int main() {
asio::thread_pool io;
asio::steady_timer task1(io, 50ms);
asio::steady_timer task2(io, 100ms);
task1.async_wait(bind(do_task, _1, "Task1", 6s));
task2.async_wait(bind(do_task, _1, "Task2", 0s));
trace("App started: ", std::thread::hardware_concurrency());
io.join();
trace("Done");
}
指纹
at 0ms - tid:1 App started: 4
at 50ms - tid:2 Start Task1: Success
at 100ms - tid:3 Start Task2: Success
at 6051ms - tid:1 Done
评论
0赞
root
8/24/2023
感谢您的回复。我尝试了您的代码,但得到了不同的结果。和我原来的问题一样。在 1ms - tid:1 应用程序启动:2 在 30ms 时 - tid:2 在 30ms 时进入工作线程 - tid:3 在 50ms 时进入工作线程 - tid:2 启动任务 1:在 6050ms 时成功 - tid:2 启动任务 2:在 6051ms 时成功 - tid:2 工作线程退出
0赞
root
8/24/2023
我的硬件是双核 ARM Cortex A7,运行 Linux 5.15。我想知道它是否与提升库版本有关。我的是 1.79
0赞
sehe
8/24/2023
即使使用 1.79 我也无法重现,请参阅编译器资源管理器和 ARM。最显着的区别是您的线程需要高达 30 毫秒才能启动。我建议与启动前插入 50 毫秒的睡眠进行比较,看看这是否会改变调度。thread_pool
async_wait
0赞
sehe
8/24/2023
考虑粘贴任何结果以使其易于阅读,例如 paste.ubuntu.com/p/kfFS7jJth9 来自您的评论
0赞
root
8/25/2023
我试过thread_pool,在async_wait之前延迟,行为没有变化。只有一个工作线程正在获取两个处理程序并对其进行序列化,而另一个工作线程似乎挂起。我每次都必须杀死应用程序。有趣的观察是,如果我以 0 毫秒同时启动两个计时器任务,那么我可以看到两个线程都拾取了处理程序并且它们都正确退出(即使在两个处理程序中都阻止了睡眠调用)。
评论