条件变量 notify_one 是否一直尝试,直到它到达使用正谓词等待的线程?

Does condition variable notify_one keep trying until it reaches thread awaiting with a positive predicate?

提问人:Piwniczne 提问时间:11/10/2023 最后编辑:Piwniczne 更新时间:11/11/2023 访问量:125

问:

我正在测试边缘情况,并且我测试了场景以使一个线程匮乏。 场景是有 99 个生产者,只有一个使用者,他们都在 1 个最大大小的队列上工作。 notify_one(在达到队列的最大大小后)击中消费者的概率接近 1%。 所以这意味着它将命中另一个生产者,该生产者将检查谓词,然后等待互斥锁。std::condition_variable

我预计程序会在这一点上挂起,但我看到的是程序尝试了尽可能多的谓词,直到最后它到达了消费者。如果没有使用者,则会永远检查所有等待线程中的谓词。

我的问题是: 标准是否定义了 notify_one() 将尝试以正谓词结果通知第一个等待线程?那么为什么它可以在没有预测的情况下与消费者一起工作(注释代码)。 还是一个虚假的唤醒,如此努力地唤醒我等待的线程? 还是别的什么?

在带有 Clang 和 MSVC 的 Windows 上进行了测试。

我的代码重现了这种情况:

#include <condition_variable>
#include <mutex>
#include <queue>
#include <vector>
#include <iostream>

class CVSimple
{

public:
    static void test() {
        std::queue<int> que;
        std::mutex m;
        std::condition_variable cv;
        int max_size = 10;
        bool working = true;


        auto producer = [&]() {
            while (working) {
                std::unique_lock<std::mutex> lock(m);
                std::chrono::milliseconds t(200);
                auto predicate = [&que, &max_size]() { 
                    if (que.size() < max_size) {
                        std::cout << "T";
                        return true;
                    } 
                    std::cout << "F";
                    return false;
                };
                if (cv.wait_for(lock, t, predicate)) {
                    std::cout << "+";
                    std::this_thread::sleep_for(std::chrono::milliseconds(50));
                    que.push(1);
                    lock.unlock();
                    std::cout << "N";
                    cv.notify_one();
                }
                else {
                    //std::cout << "P";
                }

            }
        };

        auto consumer = [&]() {
            while (working) {
                std::unique_lock<std::mutex> lock(m);
                std::chrono::milliseconds t(200);
                auto predicate = [&que]() {
                    if (!que.empty()) {
                        std::cout << "t";
                        return true;
                    }
                    else {
                        std::cout << "f";
                        return false;
                    };
                };
                //cv.wait(lock, predicate);
                //std::cout << "-";
                //std::this_thread::sleep_for(std::chrono::milliseconds(50));
                //que.pop();
                //lock.unlock();
                //std::cout << "n";
                //cv.notify_one();


                if (cv.wait_for(lock, t, predicate)) {
                    std::cout << "-";
                    std::this_thread::sleep_for(std::chrono::milliseconds(50));
                    que.pop();
                    lock.unlock();
                    std::cout << "n";
                    cv.notify_one();
                }
                else {
                    std::cout << "o";
                }
            }
        };

        int nprod = 100;
        int ncons = 1;

        std::cout << "Start producers" << std::endl;
        std::vector<std::thread> threads;
        for (int i = 0; i < nprod; ++i) {
            threads.emplace_back(producer);
        }

        std::cout << "Start consumers" << std::endl;
        for (int i = 0; i < ncons; ++i) {
            threads.emplace_back(consumer);
        }

        std::this_thread::sleep_for(std::chrono::seconds(20));
        std::cout << "Stop working" << std::endl;
        working = false;
        for (auto& th : threads) {
            if (th.joinable()) {
                th.join();
            }
        }
        return;
    }
};
#include "CVSimple.h"

int main()
{
    CVSimple::test();
}

输出:

Start producers
T+Start consumers
NT+NT+NT+NT+NT+NT+NT+NT+NT+NFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFt-nT+NFFFFFFFFFFt-nT+NFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF

因此,当队列已满时,在添加最后一个元素并发送通知后,第一个生产者会失败他的谓词(输出中的第一个 F)。然后我希望它等待另一个通知 (N),这不会发生,但正在进行大量检查。这意味着 sb 触发了它,是假唤醒吗?

我已经尝试了问题中提供的代码,它有效,但我不确定为什么。

C++ 多线程 互斥锁 stdthread 条件变量

评论

1赞 Pete Becker 11/10/2023
回答标题中的问题(尚未阅读问题):不。
1赞 RbMm 11/10/2023
在Windows中,condition_variable的当前实现具有FIFO顺序。notify_one unwait 线程,首先开始等待此变量
0赞 Useless 11/10/2023
这个问题我已经读了好几遍了,我真的不确定你到底看到了什么,以及你期望看到的是什么。但顺便说一句:生产者应该只通知消费者关心的状态变化:当他们刚刚将元素添加到空队列时
0赞 Piwniczne 11/10/2023
我添加了带有解释的输出。

答:

1赞 Solomon Slow 11/11/2023 #1

“FFFFFFFFFFFF”发生在队列已满后。这是我的想法:

您的函数调用 ,其中是超时值。队列已满后,所有生产者线程都会超等待永远不会到来的通知。每次调用超时时,生产者什么都不做除了回到循环的顶部并再次调用,并且每次调用都将测试给定的 ,一次,每次打印一个“F”,因为队列仍然已满。producercv.wait_for(lock, t, predicate)twait_forwait_forwait_forpredicate

有 99 个生产者线程和 200 毫秒的超时,我估计您应该看到每秒大约 495 个“F”,每次消费者线程收到幸运通知时,偶尔会出现“t-nT+N”。

我添加了一个例程,然后运行了程序:main

int main(int argc, char* argv[]) {
    CVSimple cvs;
    cvs.test();
}

我实际看到的是长时间的“FFFFFFF”,偶尔被“t-nt-nt-n...”打断。(通常重复九次或十次),后跟等数的“T+N”,然后回到“FFFFFFF”


但是等等!(我听到你哭了。为什么当消费者的呼叫超时时,每秒没有打印五个“o”?wait_for(...)

我不知道,但也许消费者正在渴望.请记住,即使在超时之后,也无法返回,直到它重新获取 ,并且有 99 个其他线程都在争夺它的所有权。lockwait_for(lock,...)lock

通常,当消费者开始运行时,它几乎总是在允许任何生产者再次运行之前耗尽整个队列,这一事实强烈暗示您遇到了饥饿问题。


† 如果您取消注释该行,它会做一些事情std::cout << "P"