提问人:sbi 提问时间:1/9/2015 最后编辑:sbi 更新时间:6/19/2015 访问量:29779
如何从更基本的同步基元中制作多读/单写锁?
How to make a multiple-read/single-write lock from more basic synchronization primitives?
问:
我们发现,在我们的代码中有几个地方,对受互斥锁保护的数据的并发读取相当普遍,而写入则很少见。我们的测量结果似乎表明,使用简单的互斥锁会严重阻碍读取该数据的代码的性能。因此,我们需要的是一个多读/单写互斥锁。我知道这可以建立在更简单的原语之上,但在我尝试这样做之前,我宁愿询问现有的知识:
从更简单的同步基元中构建多读/单写锁的批准方法是什么?
我确实有一个想法,但我宁愿得到我(可能是错误的)想出的答案。(注意:我期望的是一个解释如何做到这一点,可能是在伪代码中,而不是一个成熟的实现。我当然可以自己编写代码。
警告:
这需要有合理的性能。(我的想法是每次访问需要两次锁定/解锁操作。现在这可能还不够好,但需要很多它们似乎是不合理的。
通常,读取次数更多,但写入比读取更重要,对性能更敏感。读者绝不能让作家挨饿。
我们被困在一个相当旧的嵌入式平台(VxWorks 5.5 的专有版本)上,使用一个相当旧的编译器(GCC 4.1.2)和 boost 1.52——除了 boost 的大部分部分依赖于 POSIX,因为 POSIX 没有在该平台上完全实现。可用的锁定原语基本上是几种信号量(二进制、计数等),我们已经在这些信号量上创建了互斥锁、条件变量和监视器。
这是 IA32,单核。
答:
并发读取受互斥锁保护的数据相当常见,而写入则很少见
这听起来像是用户空间 RCU 的理想方案:
URCU 类似于其 Linux 内核对应物,提供读写器锁定的替代品,以及其他用途。这种相似性继续存在,读取器不直接与 RCU 更新程序同步,从而使 RCU 读取端代码路径非常快,同时进一步允许 RCU 读取器即使在与 RCU 更新程序同时运行时也能取得有用的前进进展,反之亦然。
评论
与往常一样,最好的解决方案将取决于细节。读写旋转锁可能是您正在寻找的,但其他方法(如上面建议的读-复制-更新)可能是一种解决方案 - 尽管在旧的嵌入式平台上,使用的额外内存可能是一个问题。对于罕见的写入,我经常使用任务系统来安排工作,这样只有在没有从该数据结构读取时才能进行写入,但这取决于算法。
评论
一种基于信号量和互斥锁的算法在 Concurrent Control with Readers and Writers 中进行了描述; PJ Courtois、F. Heymans 和 DL Parnas;MBLE研究实验室;比利时布鲁塞尔。
评论
这是一个基于我的 Boost 标头的简化答案(我会称 Boost 为一种批准的方式)。它只需要条件变量和互斥锁。我使用 Windows 原语重写了它,因为我发现它们具有描述性且非常简单,但将其视为伪代码。
这是一个非常简单的解决方案,它不支持互斥升级或 try_lock() 操作等操作。如果你愿意,我可以添加这些。我还删除了一些多余的装饰,例如禁用并非绝对必要的中断。
此外,值得一试(这是基于此)。它是人类可读的。boost\thread\pthread\shared_mutex.hpp
class SharedMutex {
CRITICAL_SECTION m_state_mutex;
CONDITION_VARIABLE m_shared_cond;
CONDITION_VARIABLE m_exclusive_cond;
size_t shared_count;
bool exclusive;
// This causes write blocks to prevent further read blocks
bool exclusive_wait_blocked;
SharedMutex() : shared_count(0), exclusive(false)
{
InitializeConditionVariable (m_shared_cond);
InitializeConditionVariable (m_exclusive_cond);
InitializeCriticalSection (m_state_mutex);
}
~SharedMutex()
{
DeleteCriticalSection (&m_state_mutex);
DeleteConditionVariable (&m_exclusive_cond);
DeleteConditionVariable (&m_shared_cond);
}
// Write lock
void lock(void)
{
EnterCriticalSection (&m_state_mutex);
while (shared_count > 0 || exclusive)
{
exclusive_waiting_blocked = true;
SleepConditionVariableCS (&m_exclusive_cond, &m_state_mutex, INFINITE)
}
// This thread now 'owns' the mutex
exclusive = true;
LeaveCriticalSection (&m_state_mutex);
}
void unlock(void)
{
EnterCriticalSection (&m_state_mutex);
exclusive = false;
exclusive_waiting_blocked = false;
LeaveCriticalSection (&m_state_mutex);
WakeConditionVariable (&m_exclusive_cond);
WakeAllConditionVariable (&m_shared_cond);
}
// Read lock
void lock_shared(void)
{
EnterCriticalSection (&m_state_mutex);
while (exclusive || exclusive_waiting_blocked)
{
SleepConditionVariableCS (&m_shared_cond, m_state_mutex, INFINITE);
}
++shared_count;
LeaveCriticalSection (&m_state_mutex);
}
void unlock_shared(void)
{
EnterCriticalSection (&m_state_mutex);
--shared_count;
if (shared_count == 0)
{
exclusive_waiting_blocked = false;
LeaveCriticalSection (&m_state_mutex);
WakeConditionVariable (&m_exclusive_cond);
WakeAllConditionVariable (&m_shared_cond);
}
else
{
LeaveCriticalSection (&m_state_mutex);
}
}
};
行为
好吧,这个算法的行为存在一些混淆,所以这里是它的工作原理。
在写锁定期间 - 读取器和写入器都被阻止。
在写锁定结束时 - 读取器线程和一个写入器线程将争先恐后地查看哪个线程启动。
在读取锁定期间 - 写入器被阻止。当且仅当 Writer 被阻止时,读取器也会被阻止。
在最终的 Read Lock 发布时,Reader 线程和一个写入器线程将争先恐后地查看哪个线程启动。
如果处理器经常在通知之前上下文切换到线程,这可能会导致读者饿死编写者,但我怀疑这个问题是理论上的,不切实际,因为它是 Boost 的算法。m_shared_cond
m_exclusive_cond
评论
exclusive_waiting_blocked
EnterCriticalSection
CRITICAL_SECTION
似乎您只有互斥锁和 condition_variable 作为同步原语。因此,我在这里写了一个读写锁,这让读者挨饿。它使用一个互斥锁、两个conditional_variable和三个整数。
readers - readers in the cv readerQ plus the reading reader
writers - writers in cv writerQ plus the writing writer
active_writers - the writer currently writing. can only be 1 or 0.
它以这种方式使读者挨饿。如果有几个作家想写,在所有作家都写完之前,读者将永远没有机会阅读。这是因为后来的读者需要检查变量。同时,该变量将保证一次只能有一个写入器写入。writers
active_writers
class RWLock {
public:
RWLock()
: shared()
, readerQ(), writerQ()
, active_readers(0), waiting_writers(0), active_writers(0)
{}
void ReadLock() {
std::unique_lock<std::mutex> lk(shared);
while( waiting_writers != 0 )
readerQ.wait(lk);
++active_readers;
lk.unlock();
}
void ReadUnlock() {
std::unique_lock<std::mutex> lk(shared);
--active_readers;
lk.unlock();
writerQ.notify_one();
}
void WriteLock() {
std::unique_lock<std::mutex> lk(shared);
++waiting_writers;
while( active_readers != 0 || active_writers != 0 )
writerQ.wait(lk);
++active_writers;
lk.unlock();
}
void WriteUnlock() {
std::unique_lock<std::mutex> lk(shared);
--waiting_writers;
--active_writers;
if(waiting_writers > 0)
writerQ.notify_one();
else
readerQ.notify_all();
lk.unlock();
}
private:
std::mutex shared;
std::condition_variable readerQ;
std::condition_variable writerQ;
int active_readers;
int waiting_writers;
int active_writers;
};
评论
deque
乍一看,我以为我认出这个答案与亚历山大·捷列霍夫(Alexander Terekhov)引入的算法相同。但研究之后,我认为它是有缺陷的。两个编写器可以同时等待 。当其中一个写入器唤醒并获得独占锁时,它将设置 on ,从而将互斥锁设置为不一致的状态。之后,互斥锁可能会被冲洗。m_exclusive_cond
exclusive_waiting_blocked = false
unlock
N2406 最初提出,包含部分实现,下面将重复更新的语法。std::shared_mutex
class shared_mutex
{
mutex mut_;
condition_variable gate1_;
condition_variable gate2_;
unsigned state_;
static const unsigned write_entered_ = 1U << (sizeof(unsigned)*CHAR_BIT - 1);
static const unsigned n_readers_ = ~write_entered_;
public:
shared_mutex() : state_(0) {}
// Exclusive ownership
void lock();
bool try_lock();
void unlock();
// Shared ownership
void lock_shared();
bool try_lock_shared();
void unlock_shared();
};
// Exclusive ownership
void
shared_mutex::lock()
{
unique_lock<mutex> lk(mut_);
while (state_ & write_entered_)
gate1_.wait(lk);
state_ |= write_entered_;
while (state_ & n_readers_)
gate2_.wait(lk);
}
bool
shared_mutex::try_lock()
{
unique_lock<mutex> lk(mut_, try_to_lock);
if (lk.owns_lock() && state_ == 0)
{
state_ = write_entered_;
return true;
}
return false;
}
void
shared_mutex::unlock()
{
{
lock_guard<mutex> _(mut_);
state_ = 0;
}
gate1_.notify_all();
}
// Shared ownership
void
shared_mutex::lock_shared()
{
unique_lock<mutex> lk(mut_);
while ((state_ & write_entered_) || (state_ & n_readers_) == n_readers_)
gate1_.wait(lk);
unsigned num_readers = (state_ & n_readers_) + 1;
state_ &= ~n_readers_;
state_ |= num_readers;
}
bool
shared_mutex::try_lock_shared()
{
unique_lock<mutex> lk(mut_, try_to_lock);
unsigned num_readers = state_ & n_readers_;
if (lk.owns_lock() && !(state_ & write_entered_) && num_readers != n_readers_)
{
++num_readers;
state_ &= ~n_readers_;
state_ |= num_readers;
return true;
}
return false;
}
void
shared_mutex::unlock_shared()
{
lock_guard<mutex> _(mut_);
unsigned num_readers = (state_ & n_readers_) - 1;
state_ &= ~n_readers_;
state_ |= num_readers;
if (state_ & write_entered_)
{
if (num_readers == 0)
gate2_.notify_one();
}
else
{
if (num_readers == n_readers_ - 1)
gate1_.notify_one();
}
}
该算法源自 Alexander Terekhov 的旧新闻组帖子。它既不使读者挨饿,也不使作家挨饿。
有两个“门”,和.读者和作者必须通过,并且可能会在尝试这样做时被阻止。一旦读取器通过,它就读锁定了互斥锁。只要没有最大数量的读者拥有所有权,并且只要作者没有通过,读者就可以通过。gate1_
gate2_
gate1_
gate1_
gate1_
gate1_
一次只能有一个作家通过.即使读者拥有所有权,作家也可以通过。但是一旦过去了,作家仍然没有所有权。它必须首先通过.在所有拥有所有权的读者都放弃它之前,作家无法通过。回想一下,当作家在等待时,新读者无法通过。当作家在等待时,新作家也无法通过.gate1_
gate1_
gate1_
gate2_
gate2_
gate1_
gate2_
gate1_
gate2_
读者和作者都被(几乎)相同的要求所阻挡以通过它的特征,使这种算法对读者和作者都是公平的,两者都不会挨饿。gate1_
互斥“状态”被故意保留在一个单词中,以表明部分使用原子(作为优化)来改变某些状态变化是一种可能性(即对于无争议的“快速路径”)。但是,此处未演示该优化。一个例子是,如果一个写入器线程可以从 0 原子化地更改为 然后他获得锁,而不必阻塞甚至锁定/解锁。并且可以通过原子存储来实现。等。此处未显示这些优化,因为它们比这个简单的描述听起来更难正确实现。state_
write_entered
mut_
unlock()
评论
:(
您可以采取一些好技巧来提供帮助。
一是性能好。VxWorks 以其非常好的上下文切换时间而著称。无论您使用哪种锁定解决方案,它都可能涉及信号量。我不会害怕为此使用信号量(复数),它们在 VxWorks 中得到了很好的优化,并且快速的上下文切换时间有助于最大限度地减少因评估许多信号量状态等而导致的性能下降。
此外,我会忘记使用POSIX信号量,它们只是将分层在VxWork自己的信号量之上。VxWorks 提供原生计数、二进制和互斥信号量;使用合适的那个可以使这一切更快一些。二进制有时非常有用;发布多次,不得超过1的值。
其次,写入比读取更重要。当我在 VxWorks 中遇到此类要求并使用信号量来控制访问时,我使用任务优先级来指示哪个任务更重要,应该首先访问资源。这效果很好;从字面上看,VxWorks中的所有内容都像其他任务一样(线程),包括所有设备驱动程序等。
VxWorks 还解决了优先级倒置问题(Linus Torvalds 讨厌的那种事情)。因此,如果您使用信号量实现锁定,则可以依靠操作系统调度程序来启动优先级较低的读取器(如果它们阻止了优先级较高的写入器)。它可以带来更简单的代码,并且您也可以充分利用操作系统。
因此,一个潜在的解决方案是让一个VxWorks计数信号量来保护资源,初始化为等于读取器数量的值。每次读者想要阅读时,它都会获取信号量(将计数减少 1。每次读取完成时,它都会发布信号量,将计数增加 1。每次作者想要写作时,它都会花费信号量 n(n = 读者数量)次,并在完成后发布 n 次。最后,使编写器任务的优先级高于任何读取器,并依靠操作系统快速的上下文切换时间和优先级反转。
请记住,您是在硬实时操作系统上编程,而不是在 Linux 上编程。获取/发布原生 VxWorks 信号量并不像在 Linux 上执行类似操作那样涉及相同的运行时间,尽管现在即使是 Linux 也相当不错(我现在使用 PREEMPT_RT)。可以依赖 VxWorks 调度程序和所有设备驱动程序来运行。如果您愿意,您甚至可以将编写器任务设置为整个系统中的最高优先级,甚至高于所有设备驱动程序!
为了帮助事情顺利进行,还要考虑每个线程在做什么。VxWorks 允许您指示任务正在/未使用 FPU。如果您使用的是本机 VxWorks TaskSpawn 例程而不是pthread_create则有机会指定此例程。这意味着,如果你的线程/任务没有做任何浮点数学运算,并且你在调用 TaskSpawn 时已经这样说过,那么上下文切换时间会更快,因为调度程序不会费心去保留/恢复 FPU 状态。
这很有可能成为您正在开发的平台上的最佳解决方案。它发挥了操作系统的优势(快速信号量、快速上下文切换时间),而无需引入大量额外代码来重新创建其他平台上常见的替代(可能更优雅)解决方案。
第三,坚持使用旧的 GCC 和旧的 Boost。基本上,除了关于打电话给 WindRiver 和讨论购买升级的低价值建议外,我无能为力。就我个人而言,当我为 VxWorks 编程时,我使用的是 VxWork 的原生 API,而不是 POSIX。好的,所以代码不是很可移植,但它最终变得很快;无论如何,POSIX 只是原生 API 之上的一层,这总是会减慢速度。
也就是说,POSIX 计数和互斥信号量与 VxWork 的原生计数和互斥信号量非常相似。这可能意味着 POSIX 层不是很厚。
有关 VxWorks 编程的一般说明
调试我一直试图使用可用于 Solaris 的开发工具 (Tornado)。这是迄今为止我遇到过的最好的多线程调试环境。为什么?它允许您启动多个调试会话,系统中的每个线程/任务一个。您最终会为每个线程提供一个调试窗口,并且您正在单独和独立地调试每个线程。单步执行阻塞操作,该调试窗口将被阻塞。将鼠标焦点移动到另一个调试窗口,单步执行将释放块的操作,并观察第一个窗口完成其步骤。
你最终会得到很多调试窗口,但这是迄今为止调试多线程内容的最佳方式。它使写出非常复杂的东西和看到问题变得非常容易。您可以轻松地探索应用程序中的不同动态交互,因为您可以随时简单而强大地控制每个线程正在执行的操作。
具有讽刺意味的是,Windows 版本的 Tornado 不允许您这样做;每个系统一个可怜的单个调试窗口,就像任何其他无聊的旧 IDE(如 Visual Studio 等)一样。我从未见过现代 IDE 在多线程调试方面能像 Solaris 上的 Tornado 一样出色。
硬盘如果您的读取器和写入器使用的是磁盘上的文件,请考虑 VxWorks 5.5 已经很旧了。像 NCQ 这样的东西将不受支持。在这种情况下,我提出的解决方案(如上所述)可能最好使用单个互斥信号量来完成,以防止多个读取器在读取磁盘的不同部分时相互绊倒。这取决于您的读取器到底在做什么,但如果他们正在从文件中读取连续数据,这将避免在磁盘表面上来回晃动读/写头(非常慢)。
就我而言,我使用这个技巧来调整网络接口上的流量;每个任务都发送不同类型的数据,任务优先级反映了数据在网络上的优先级。它非常优雅,没有消息是碎片化的,但重要的消息获得了可用带宽的最大份额。
评论
现在 Microsoft 已经开放了 .NET 源代码,您可以查看他们的 ReaderWRiterLockSlim 实现。
我不确定他们使用的更基本的基元是否可供您使用,其中一些也是 .NET 库的一部分,它们的代码也可用。
Microsoft 花了很多时间来提高其锁定机制的性能,因此这可能是一个很好的起点。
评论
下一个:为什么迭代器需要默认可构造
评论