一次停止多个线程

Stopping multiple threads at once

提问人:Less White 提问时间:3/30/2022 最后编辑:Less White 更新时间:3/30/2022 访问量:603

问:

我在下面的程序中错过了什么,线程正在等待确定何时停止?在下面列出的程序中,线程以不可预测的方式停止;有些在打电话之前,有些根本没有停止。condition_variable_anynotify_all

使用的条件变量定义如下:

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;

线程检查是否该停止,如下所示:

std::unique_lock<std::mutex> lock(interrupt_mutex);
const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
const auto timeout_expired = cv_status == std::cv_status::timeout;
if (!timeout_expired)
{
    quit = true;
}

主线程发出停止线程的信号,如下所示:

std::unique_lock<std::mutex> lock(interrupt_mutex);
interrupt_cv.notify_all();

可能的输出如下所示:

Thread  1> Received interrupt signal at iteration 2
Thread  1> Terminate
Thread  2> Received interrupt signal at iteration 2
Thread  2> Terminate
Thread  4> Received interrupt signal at iteration 2
Thread  4> Terminate
**** Requesting all threads to stop ****
Waiting for all threads to complete...

在重现问题的完整代码下方:

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;

int main()
{
    std::vector<std::thread> thread_handles;
    for (int thread_idx = 0; thread_idx < 4; ++thread_idx)
    {
        thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
        {
            int num_iterations = 0;
            auto quit = false;
            while (!quit)
            {
                // Fake processing time during the lock for testing purpose
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                ++num_iterations;

                // Check if need to stop with a timeout of 200ms 
                {
                    std::unique_lock<std::mutex> lock(interrupt_mutex);
                    const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
                    if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
                    {
                        printf("Thread %2d> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
                        quit = true;
                    }
                }
            }

            printf("Thread %2d> Terminate\n", thread_id);
        }, thread_idx + 1));
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // Signals all threads to stop
    {
        printf("**** Requesting all threads to stop ****\n");
        std::unique_lock<std::mutex> lock(interrupt_mutex);
        interrupt_cv.notify_all();
    }

    // Wait until all threads stop
    printf("Waiting for all threads to complete...\n");
    std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
    {
        thread_handle.join();
    });

    printf("Program ends\n");
    return 0;
}
C++ 多线程 C++11 标准

评论

2赞 user4581301 3/30/2022
条件变量看起来不像是适合此作业的工具。条件变量将在每次迭代时停止并等待,从而减慢速度。如果这不是您想要的行为,我认为 a 或 std::atomic_flag 会是更好的选择。std::atomic<bool>
0赞 Less White 3/30/2022
好的,我知道我应该使用 std::atomic_flag。但话虽如此,为什么线程不能可靠地停止?我使用超时的原因,因此线程必须执行其他操作,然后定期检查它们是否必须停止。
1赞 WhozCraig 3/30/2022
此外,不要将 cv 用于状态。这不是他们的本意。这就是谓词数据的用途。如果有理由“知道”cv 等待操作何时超时(也许是将线程池缩小到某个最小大小等),那没关系,但核心等待仍应基于受互斥锁保护的外部谓词数据。
1赞 RufusVS 3/30/2022
我运行了您的示例代码,线程都停止了......最终。除了一两个人之外,所有人都立即停了下来。但是在我的最后一次运行(6 个线程)中,最后一个终止的是迭代 232 处的线程 3。

答:

0赞 ixSci 3/30/2022 #1

您的代码存在 2 个问题,并且两者都具有相同的解决方案。

  1. 虚假唤醒。如果你的状态因为SW而结束,你的条件就会得到满足,尽管没有人真正要求它唤醒/结束。wait_for
  2. 如果你的线程没有锁并且没有休眠,并且你的主线程通知了所有人,该怎么办?请注意,通知不会存储在任何地方,因此如果您错过了通知,以后将不会收到通知。因此,那些错过的线程将永远不会终止。

要解决这两个问题,您需要另一个标志,它将告诉您的线程工作已完成并且它们必须停止。

static bool stop = false;
//...
if (stop) // Instead of if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
//...
printf("**** Requesting all threads to stop ****\n");
std::unique_lock<std::mutex> lock(interrupt_mutex);
stop = true;
interrupt_cv.notify_all();
4赞 Remy Lebeau 3/30/2022 #2

A 用于在条件更改时(即,当共享变量更改值时)向线程发出信号。但是你的代码没有条件。你试图用它本身作为退出信号,这不是它的本意。 只会唤醒在那一刻正在积极等待的线程。由于忙于做其他事情而未等待它的线程将不会收到终止信号。但是,一旦这些线程准备好等待,就需要检测条件。因此,条件需要更持久。这就是您的代码无法正常工作的原因。condition_variablecondition_variablenotify_all()condition_variable

在这种情况下,只需将变量移动到全局范围,即可在 和 旁边。设置该变量将充当您的条件,您可以向等待线程发出信号。然后,您可以使用重载版本来检查当前状态(忽略虚假唤醒)。quitcondition_variablemutexquitwait_for()quit

尝试更像这样的东西:

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;
static bool quit = false;

int main()
{
    std::vector<std::thread> thread_handles;
    for (int thread_idx = 0; thread_idx < 4; ++thread_idx)
    {
        thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
        {
            int num_iterations = 0;
            while (true)
            {
                // Fake processing time outside the lock for testing purpose
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                ++num_iterations;

                // Check if need to stop with a timeout of 1s 
                {
                    std::unique_lock<std::mutex> lock(interrupt_mutex);
                    const bool signaled = interrupt_cv.wait_for(lock, std::chrono::seconds(1), [](){ return quit; });
                    if (signaled) break;
                }
            }

            printf("Thread %2d> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
            printf("Thread %2d> Terminate\n", thread_id);
        }, thread_idx + 1));
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // Signals all threads to stop
    printf("**** Requesting all threads to stop ****\n");
    {
        std::lock_guard<std::mutex> lock(interrupt_mutex);
        quit = true;
    }
    interrupt_cv.notify_all();

    // Wait until all threads stop
    printf("Waiting for all threads to complete...\n");
    std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
    {
        thread_handle.join();
    });

    printf("Program ends\n");
    return 0;
}

评论

0赞 Less White 3/30/2022
非常感谢您的清晰解释。我现在更了解如何使用条件变量。