锁定单个生产者、多个使用者 FIFO 队列的问题

Problems with locking single producer, multiple consumer fifo queue

提问人:Suno 提问时间:11/11/2023 更新时间:11/12/2023 访问量:82

问:

我有一个简单的小队列,其中一个任务从文件读取到队列中,几个任务解压缩内容。我工作了一段时间,但最终崩溃了,因为队列是空的,即使我在之前的行中检查队列是否为空!(请参阅代码中的注释)

#pragma once
#include <optional>
#include <future>
#include <queue>

template<class T, uint32_t MAX_SIZE>
class spmc_fifo_queue
{
public:
    using optional_queue_pair = std::optional<T>;
    bool put(optional_queue_pair&& p)
    {
        if (my_queue.size() >= MAX_SIZE) {
            std::unique_lock my_lock(my_mutex);
            my_cv_remove.wait(my_lock, [this] { return my_queue.size() < MAX_SIZE; });
        }
        std::lock_guard<std::mutex> my_lock(my_mutex);
        my_queue.push(std::move(p));
        my_cv_add.notify_one();
        return true;
    }
    using optional_queue_pair = std::optional<T>;
    optional_queue_pair get()
    {
        if (my_queue.size() == 0) {
            std::unique_lock my_lock(my_mutex);
            my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
        }
        std::scoped_lock my_lock(my_mutex);
        optional_queue_pair ret = std::move(my_queue.front()); //some times my_queue.size() == 0 why????
        my_queue.pop();
        my_cv_remove.notify_one();
        return ret;
    }
private:
    using optional_queue_pair = std::optional<T>;
    std::queue<optional_queue_pair> my_queue;
    std::mutex my_mutex;
    std::condition_variable my_cv_add;
    std::condition_variable my_cv_remove;
};

我试图防止这种情况发生,但显然我不了解互斥锁的细节!

C++ 队列 锁定 互斥锁 多任务处理

评论


答:

0赞 Ahmed AEK 11/11/2023 #1

使用虚假唤醒时,条件变量可能会唤醒多个线程notify_one

锁定互斥锁后,需要再次检查队列是否为空,可能有两个线程已经唤醒并正在等待锁定,只有第一个线程会在队列中找到一些东西,另一个会发现它是空的,你还需要把整个东西放在一个中,因为队列不允许失败。while (true)

optional_queue_pair get()
{
    while (true)  // retry until succeed
    {
        if (my_queue.size() == 0) {
            std::unique_lock my_lock(my_mutex);
            my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
        }
        std::scoped_lock my_lock(my_mutex);  // two threads awaken one waiting on lock
        if (my_queue.size() != 0) // first thread sees true, second sees false
        {
            optional_queue_pair ret = std::move(my_queue.front());
            my_queue.pop();
            my_cv_remove.notify_all();  // because notify_one may not awaken the producer.
            return ret;
        }
    }
}

对于生产环境,我可能会使用类似 boost lockfree queue 的东西,它是多个生产者-多个消费者无锁队列,并经过广泛测试,而不是滚动你自己的(较慢的)多消费者队列实现。

2赞 Sergey Vlasov 11/11/2023 #2

这是错误的:

if (my_queue.size() == 0) {
    std::unique_lock my_lock(my_mutex);
    my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}

您应该先锁定互斥锁以检查 my_queue.size()。大小可能返回 > 0,但同时另一个线程从 my_queue 弹出。

正确的是:

std::unique_lock my_lock(my_mutex);
if (my_queue.size() == 0) {
    my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}

更新:

谢谢艾哈迈德AEK,“如果”不需要。我们可以做得更简单:

std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });

请参阅文档:https://en.cppreference.com/w/cpp/thread/condition_variable

评论

0赞 Ahmed AEK 11/11/2023
在某一时刻,等待条件变量的两个线程将唤醒,并看到谓词 true,并将尝试在没有互斥锁保护的情况下修改队列,这是灾难性的。
0赞 Sergey Vlasov 11/12/2023
不。从文档(en.cppreference.com/w/cpp/thread/condition_variable)中,了解带有谓词的wait()是如何工作的: 1.释放互斥锁并等待。2. 醒来 3.再次获取互斥锁(在检查条件之前)。只有一个线程将获得互斥锁和检查条件。其他线程将等待互斥锁。
0赞 Ahmed AEK 11/12/2023
你是对的,在这种情况下,你可以完全删除。if
0赞 Sergey Vlasov 11/12/2023
看来你是对的