提问人:Piwniczne 提问时间:11/10/2023 最后编辑:Piwniczne 更新时间:11/11/2023 访问量:125
条件变量 notify_one 是否一直尝试,直到它到达使用正谓词等待的线程?
Does condition variable notify_one keep trying until it reaches thread awaiting with a positive predicate?
问:
我正在测试边缘情况,并且我测试了场景以使一个线程匮乏。
场景是有 99 个生产者,只有一个使用者,他们都在 1 个最大大小的队列上工作。
notify_one(在达到队列的最大大小后)击中消费者的概率接近 1%。
所以这意味着它将命中另一个生产者,该生产者将检查谓词,然后等待互斥锁。std::condition_variable
我预计程序会在这一点上挂起,但我看到的是程序尝试了尽可能多的谓词,直到最后它到达了消费者。如果没有使用者,则会永远检查所有等待线程中的谓词。
我的问题是: 标准是否定义了 notify_one() 将尝试以正谓词结果通知第一个等待线程?那么为什么它可以在没有预测的情况下与消费者一起工作(注释代码)。 还是一个虚假的唤醒,如此努力地唤醒我等待的线程? 还是别的什么?
在带有 Clang 和 MSVC 的 Windows 上进行了测试。
我的代码重现了这种情况:
#include <condition_variable>
#include <mutex>
#include <queue>
#include <vector>
#include <iostream>
class CVSimple
{
public:
static void test() {
std::queue<int> que;
std::mutex m;
std::condition_variable cv;
int max_size = 10;
bool working = true;
auto producer = [&]() {
while (working) {
std::unique_lock<std::mutex> lock(m);
std::chrono::milliseconds t(200);
auto predicate = [&que, &max_size]() {
if (que.size() < max_size) {
std::cout << "T";
return true;
}
std::cout << "F";
return false;
};
if (cv.wait_for(lock, t, predicate)) {
std::cout << "+";
std::this_thread::sleep_for(std::chrono::milliseconds(50));
que.push(1);
lock.unlock();
std::cout << "N";
cv.notify_one();
}
else {
//std::cout << "P";
}
}
};
auto consumer = [&]() {
while (working) {
std::unique_lock<std::mutex> lock(m);
std::chrono::milliseconds t(200);
auto predicate = [&que]() {
if (!que.empty()) {
std::cout << "t";
return true;
}
else {
std::cout << "f";
return false;
};
};
//cv.wait(lock, predicate);
//std::cout << "-";
//std::this_thread::sleep_for(std::chrono::milliseconds(50));
//que.pop();
//lock.unlock();
//std::cout << "n";
//cv.notify_one();
if (cv.wait_for(lock, t, predicate)) {
std::cout << "-";
std::this_thread::sleep_for(std::chrono::milliseconds(50));
que.pop();
lock.unlock();
std::cout << "n";
cv.notify_one();
}
else {
std::cout << "o";
}
}
};
int nprod = 100;
int ncons = 1;
std::cout << "Start producers" << std::endl;
std::vector<std::thread> threads;
for (int i = 0; i < nprod; ++i) {
threads.emplace_back(producer);
}
std::cout << "Start consumers" << std::endl;
for (int i = 0; i < ncons; ++i) {
threads.emplace_back(consumer);
}
std::this_thread::sleep_for(std::chrono::seconds(20));
std::cout << "Stop working" << std::endl;
working = false;
for (auto& th : threads) {
if (th.joinable()) {
th.join();
}
}
return;
}
};
#include "CVSimple.h"
int main()
{
CVSimple::test();
}
输出:
Start producers
T+Start consumers
NT+NT+NT+NT+NT+NT+NT+NT+NT+NFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFt-nT+NFFFFFFFFFFt-nT+NFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
因此,当队列已满时,在添加最后一个元素并发送通知后,第一个生产者会失败他的谓词(输出中的第一个 F)。然后我希望它等待另一个通知 (N),这不会发生,但正在进行大量检查。这意味着 sb 触发了它,是假唤醒吗?
我已经尝试了问题中提供的代码,它有效,但我不确定为什么。
答:
“FFFFFFFFFFFF”发生在队列已满后。这是我的想法:
您的函数调用 ,其中是超时值。队列已满后,所有生产者线程都会超时等待永远不会到来的通知。每次调用超时时,生产者什么都不做†除了回到循环的顶部并再次调用,并且每次调用都将测试给定的 ,一次,每次打印一个“F”,因为队列仍然已满。producer
cv.wait_for(lock, t, predicate)
t
wait_for
wait_for
wait_for
predicate
有 99 个生产者线程和 200 毫秒的超时,我估计您应该看到每秒大约 495 个“F”,每次消费者线程收到幸运通知时,偶尔会出现“t-nT+N”。
我添加了一个例程,然后运行了程序:main
int main(int argc, char* argv[]) {
CVSimple cvs;
cvs.test();
}
我实际看到的是长时间的“FFFFFFF”,偶尔被“t-nt-nt-n...”打断。(通常重复九次或十次),后跟等数的“T+N”,然后回到“FFFFFFF”
但是等等!(我听到你哭了。为什么当消费者的呼叫超时时,每秒没有打印五个“o”?wait_for(...)
我不知道,但也许消费者正在渴望.请记住,即使在超时之后,也无法返回,直到它重新获取 ,并且有 99 个其他线程都在争夺它的所有权。lock
wait_for(lock,...)
lock
通常,当消费者开始运行时,它几乎总是在允许任何生产者再次运行之前耗尽整个队列,这一事实强烈暗示您遇到了饥饿问题。
† 如果您取消注释该行,它会做一些事情。std::cout << "P"
上一个:数据结构的锁定安全性 [已关闭]
评论