提问人:Suno 提问时间:11/11/2023 更新时间:11/12/2023 访问量:82
锁定单个生产者、多个使用者 FIFO 队列的问题
Problems with locking single producer, multiple consumer fifo queue
问:
我有一个简单的小队列,其中一个任务从文件读取到队列中,几个任务解压缩内容。我工作了一段时间,但最终崩溃了,因为队列是空的,即使我在之前的行中检查队列是否为空!(请参阅代码中的注释)
#pragma once
#include <optional>
#include <future>
#include <queue>
template<class T, uint32_t MAX_SIZE>
class spmc_fifo_queue
{
public:
using optional_queue_pair = std::optional<T>;
bool put(optional_queue_pair&& p)
{
if (my_queue.size() >= MAX_SIZE) {
std::unique_lock my_lock(my_mutex);
my_cv_remove.wait(my_lock, [this] { return my_queue.size() < MAX_SIZE; });
}
std::lock_guard<std::mutex> my_lock(my_mutex);
my_queue.push(std::move(p));
my_cv_add.notify_one();
return true;
}
using optional_queue_pair = std::optional<T>;
optional_queue_pair get()
{
if (my_queue.size() == 0) {
std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}
std::scoped_lock my_lock(my_mutex);
optional_queue_pair ret = std::move(my_queue.front()); //some times my_queue.size() == 0 why????
my_queue.pop();
my_cv_remove.notify_one();
return ret;
}
private:
using optional_queue_pair = std::optional<T>;
std::queue<optional_queue_pair> my_queue;
std::mutex my_mutex;
std::condition_variable my_cv_add;
std::condition_variable my_cv_remove;
};
我试图防止这种情况发生,但显然我不了解互斥锁的细节!
答:
0赞
Ahmed AEK
11/11/2023
#1
使用虚假唤醒时,条件变量可能会唤醒多个线程notify_one
锁定互斥锁后,需要再次检查队列是否为空,可能有两个线程已经唤醒并正在等待锁定,只有第一个线程会在队列中找到一些东西,另一个会发现它是空的,你还需要把整个东西放在一个中,因为队列不允许失败。while (true)
optional_queue_pair get()
{
while (true) // retry until succeed
{
if (my_queue.size() == 0) {
std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}
std::scoped_lock my_lock(my_mutex); // two threads awaken one waiting on lock
if (my_queue.size() != 0) // first thread sees true, second sees false
{
optional_queue_pair ret = std::move(my_queue.front());
my_queue.pop();
my_cv_remove.notify_all(); // because notify_one may not awaken the producer.
return ret;
}
}
}
对于生产环境,我可能会使用类似 boost lockfree queue 的东西,它是多个生产者-多个消费者无锁队列,并经过广泛测试,而不是滚动你自己的(较慢的)多消费者队列实现。
2赞
Sergey Vlasov
11/11/2023
#2
这是错误的:
if (my_queue.size() == 0) {
std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}
您应该先锁定互斥锁以检查 my_queue.size()。大小可能返回 > 0,但同时另一个线程从 my_queue 弹出。
正确的是:
std::unique_lock my_lock(my_mutex);
if (my_queue.size() == 0) {
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}
更新:
谢谢艾哈迈德AEK,“如果”不需要。我们可以做得更简单:
std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
请参阅文档:https://en.cppreference.com/w/cpp/thread/condition_variable
评论
0赞
Ahmed AEK
11/11/2023
在某一时刻,等待条件变量的两个线程将唤醒,并看到谓词 true,并将尝试在没有互斥锁保护的情况下修改队列,这是灾难性的。
0赞
Sergey Vlasov
11/12/2023
不。从文档(en.cppreference.com/w/cpp/thread/condition_variable)中,了解带有谓词的wait()是如何工作的: 1.释放互斥锁并等待。2. 醒来 3.再次获取互斥锁(在检查条件之前)。只有一个线程将获得互斥锁和检查条件。其他线程将等待互斥锁。
0赞
Ahmed AEK
11/12/2023
你是对的,在这种情况下,你可以完全删除。if
0赞
Sergey Vlasov
11/12/2023
看来你是对的
评论