提问人:Less White 提问时间:3/30/2022 最后编辑:Less White 更新时间:3/30/2022 访问量:603
一次停止多个线程
Stopping multiple threads at once
问:
我在下面的程序中错过了什么,线程正在等待确定何时停止?在下面列出的程序中,线程以不可预测的方式停止;有些在打电话之前,有些根本没有停止。condition_variable_any
notify_all
使用的条件变量定义如下:
static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;
线程检查是否该停止,如下所示:
std::unique_lock<std::mutex> lock(interrupt_mutex);
const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
const auto timeout_expired = cv_status == std::cv_status::timeout;
if (!timeout_expired)
{
quit = true;
}
主线程发出停止线程的信号,如下所示:
std::unique_lock<std::mutex> lock(interrupt_mutex);
interrupt_cv.notify_all();
可能的输出如下所示:
Thread 1> Received interrupt signal at iteration 2
Thread 1> Terminate
Thread 2> Received interrupt signal at iteration 2
Thread 2> Terminate
Thread 4> Received interrupt signal at iteration 2
Thread 4> Terminate
**** Requesting all threads to stop ****
Waiting for all threads to complete...
在重现问题的完整代码下方:
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;
int main()
{
std::vector<std::thread> thread_handles;
for (int thread_idx = 0; thread_idx < 4; ++thread_idx)
{
thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
{
int num_iterations = 0;
auto quit = false;
while (!quit)
{
// Fake processing time during the lock for testing purpose
std::this_thread::sleep_for(std::chrono::milliseconds(200));
++num_iterations;
// Check if need to stop with a timeout of 200ms
{
std::unique_lock<std::mutex> lock(interrupt_mutex);
const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
{
printf("Thread %2d> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
quit = true;
}
}
}
printf("Thread %2d> Terminate\n", thread_id);
}, thread_idx + 1));
}
std::this_thread::sleep_for(std::chrono::seconds(5));
// Signals all threads to stop
{
printf("**** Requesting all threads to stop ****\n");
std::unique_lock<std::mutex> lock(interrupt_mutex);
interrupt_cv.notify_all();
}
// Wait until all threads stop
printf("Waiting for all threads to complete...\n");
std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
{
thread_handle.join();
});
printf("Program ends\n");
return 0;
}
答:
0赞
ixSci
3/30/2022
#1
您的代码存在 2 个问题,并且两者都具有相同的解决方案。
- 虚假唤醒。如果你的状态因为SW而结束,你的条件就会得到满足,尽管没有人真正要求它唤醒/结束。
wait_for
- 如果你的线程没有锁并且没有休眠,并且你的主线程通知了所有人,该怎么办?请注意,通知不会存储在任何地方,因此如果您错过了通知,以后将不会收到通知。因此,那些错过的线程将永远不会终止。
要解决这两个问题,您需要另一个标志,它将告诉您的线程工作已完成并且它们必须停止。
static bool stop = false;
//...
if (stop) // Instead of if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
//...
printf("**** Requesting all threads to stop ****\n");
std::unique_lock<std::mutex> lock(interrupt_mutex);
stop = true;
interrupt_cv.notify_all();
4赞
Remy Lebeau
3/30/2022
#2
A 用于在条件更改时(即,当共享变量更改值时)向线程发出信号。但是你的代码没有条件。你试图用它本身作为退出信号,这不是它的本意。 只会唤醒在那一刻正在积极等待的线程。由于忙于做其他事情而未等待它的线程将不会收到终止信号。但是,一旦这些线程准备好等待,就需要检测条件。因此,条件需要更持久。这就是您的代码无法正常工作的原因。condition_variable
condition_variable
notify_all()
condition_variable
在这种情况下,只需将变量移动到全局范围,即可在 和 旁边。设置该变量将充当您的条件,您可以向等待线程发出信号。然后,您可以使用重载版本来检查当前状态(忽略虚假唤醒)。quit
condition_variable
mutex
quit
wait_for()
quit
尝试更像这样的东西:
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;
static bool quit = false;
int main()
{
std::vector<std::thread> thread_handles;
for (int thread_idx = 0; thread_idx < 4; ++thread_idx)
{
thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
{
int num_iterations = 0;
while (true)
{
// Fake processing time outside the lock for testing purpose
std::this_thread::sleep_for(std::chrono::milliseconds(200));
++num_iterations;
// Check if need to stop with a timeout of 1s
{
std::unique_lock<std::mutex> lock(interrupt_mutex);
const bool signaled = interrupt_cv.wait_for(lock, std::chrono::seconds(1), [](){ return quit; });
if (signaled) break;
}
}
printf("Thread %2d> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
printf("Thread %2d> Terminate\n", thread_id);
}, thread_idx + 1));
}
std::this_thread::sleep_for(std::chrono::seconds(5));
// Signals all threads to stop
printf("**** Requesting all threads to stop ****\n");
{
std::lock_guard<std::mutex> lock(interrupt_mutex);
quit = true;
}
interrupt_cv.notify_all();
// Wait until all threads stop
printf("Waiting for all threads to complete...\n");
std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
{
thread_handle.join();
});
printf("Program ends\n");
return 0;
}
评论
0赞
Less White
3/30/2022
非常感谢您的清晰解释。我现在更了解如何使用条件变量。
评论
std::atomic_flag
会是更好的选择。std::atomic<bool>