如何在VxWorks中避免条件变量中的争用条件

How to avoid race conditions in a condition variable in VxWorks

提问人:sbi 提问时间:9/30/2013 最后编辑:sbi 更新时间:10/9/2013 访问量:2308

问:

我们在 VxWorks 5.5 之上的专有嵌入式平台上进行编程。在我们的工具箱中,我们有一个条件变量,它是使用 VxWorks 二进制信号量实现的。

现在,POSIX 提供了一个 wait 函数,该函数也采用互斥锁。这将解锁互斥锁(以便其他任务可以写入数据)并等待其他任务发出信号(写入数据已完成)。我相信这实现了所谓的监视器,ICBWT。

我们需要这样一个等待函数,但实现它很棘手。一个简单的方法可以做到这一点:

bool condition::wait_for(mutex& mutex) const {
    unlocker ul(mutex);    // relinquish mutex
    return wait(event);
}                          // ul's dtor grabs mutex again

但是,这具有竞争条件,因为它允许另一个任务在解锁后和等待之前抢占此任务。另一个任务可以写入解锁后的日期,并在此任务开始等待信号量之前发出条件信号。(我们已经对此进行了测试,这确实发生了,并永远阻止了等待任务。

鉴于 VxWorks 5.5 似乎没有提供在等待信号时暂时放弃信号量的 API,有没有办法在提供的同步例程之上实现这一点?

注意:这是一个非常旧的VxWorks版本,在没有POSIX支持的情况下编译(据我所知,由专有硬件的供应商编译)。

C++ 多线程 vxworks

评论

1赞 Martin James 9/30/2013
嗯。。。从未尝试在没有本机支持的情况下实现“Condvar”。我总是能够独自使用信号量和互斥锁。
0赞 sehe 9/30/2013
Unlocker有什么作用?因为当我看到这个模式时,我完全期望它使用 RAII 风格。它可能会在析构函数中解锁。它可能会在生命周期内解锁。这对我来说并不明显。
0赞 Martin James 9/30/2013
@sehe - 我也有点困惑。为什么不只用二进制信号量来保护它呢?好吧,语义不同,但是 WTH...
0赞 sehe 9/30/2013
@MartinJames可能是因为需要发出信号吗?无论如何,事件具有不同的语义。
0赞 sbi 9/30/2013
@sehe,@Martin:在其 CTOR 中解锁资源,然后再次将其锁定在其 DTOR 中。(对不起,我虽然这很明显。unlocker

答:

-1赞 Ivan Voras 10/5/2013 #1

从描述中可以看出,您可能想要实现(或使用)信号量 - 它是一种标准的 CS 算法,其语义类似于 condvars,并且有大量关于如何实现它们的教科书(https://www.google.com/search?q=semaphore+algorithm)。

解释信号量的随机谷歌结果位于:http://www.cs.cornell.edu/courses/cs414/2007sp/lectures/08-bakery.ppt(见幻灯片 32)。

评论

0赞 sbi 10/6/2013
我建议你加倍努力,真正阅读这个问题提示:这不是关于如何实现条件变量,而是关于如何实现某种类型的等待函数。
0赞 Ivan Voras 10/6/2013
嘿,如果提出的每个答案都是错误的,您可能需要重新考虑这个问题。正如其他人所注意到的,您正在尝试实现 condvar,如果您需要这种特定行为而没有平台支持它,您可能需要自己实现它,这意味着不要使用您的平台支持的互斥类型。干杯。
0赞 sbi 10/7/2013
这个问题清楚地说:“但是,这具有竞争条件,因为它允许另一个任务在解锁后和等待之前抢占这个任务。如果你甚至懒得阅读问题,你也无法获得 cookie。手。
5赞 Chris Desjardins 10/5/2013 #2

使用本机 vxworks 应该很容易,这里需要消息队列。您的wait_for方法可以按原样使用。

bool condition::wait_for(mutex& mutex) const 
{
    unlocker ul(mutex);    // relinquish mutex
    return wait(event);
}                          // ul's dtor grabs mutex again

但是 wait(event) 代码如下所示:

wait(event)
{
    if (msgQRecv(event->q, sigMsgBuf, sigMsgSize, timeoutTime) == OK)
    {
        // got it...
    }
    else
    {
        // timeout, report error or something like that....
    }
}

你的信号代码是这样的:

signal(event)
{
    msgQSend(event->q, sigMsg, sigMsgSize, NO_WAIT, MSG_PRI_NORMAL);
}

因此,如果信号在您开始等待之前被触发,那么 msgQRecv 将在最终调用信号时立即返回信号,然后您可以如上所述在 ul dtor 中再次使用互斥锁。

event->q 是在事件创建时通过调用 msgQCreate 创建的MSG_Q_ID,sigMsg 中的数据由您定义......但可以只是一个随机的数据字节,或者你可以想出一个更智能的结构,其中包含有关谁发出信号的信息或其他可能很高兴知道的东西。

更新多个服务员,这有点棘手: 因此,我将做出一些假设来简化事情

  1. 待处理的任务数在事件创建时是已知的,并且是恒定的。
  2. 将有一个任务始终负责指示何时可以解锁互斥锁,所有其他任务只需要在事件发出信号/完成时发出通知。

这种方法使用计数信号量,类似于上面的信号量,只是稍微增加了一点逻辑:

wait(event)
{
    if (semTake(event->csm, timeoutTime) == OK)
    {
        // got it...
    }
    else
    {
        // timeout, report error or something like that....
    }
}

你的信号代码是这样的:

signal(event)
{
    for (int x = 0; x < event->numberOfWaiters; x++)
    {
        semGive(event->csm);
    }
}

事件的创建是这样的,请记住,在此示例中,服务员的数量是恒定的,并且在事件创建时是已知的。您可以将其设置为动态的,但关键是每次事件发生时,在解锁器解锁互斥锁之前,numberOfWaiters 必须正确。

createEvent(numberOfWaiters)
{
    event->numberOfWaiters = numberOfWaiters;
    event->csv = semCCreate(SEM_Q_FIFO, 0);
    return event;
}

你不能对 numberOfWaiters :D一厢情愿我再说一遍:在解锁器解锁互斥锁之前,numberOfWaiters 必须正确。要使其动态化(如果这是必需的),您可以添加一个 setNumWaiters(numOfWaiters) 函数,并在解锁器解锁互斥锁之前在 wait_for 函数中调用该函数,只要它始终正确设置数字即可。

现在对于最后一个技巧,如上所述,假设一个任务负责解锁互斥锁,其余任务只是等待信号,这意味着只有一个任务将调用上面的 wait_for() 函数,其余任务只需调用 wait(event) 函数。

考虑到这一点,numberOfWaiters 的计算方式如下:

  • 将调用 wait() 的任务数
  • 加 1 表示调用 wait_for() 的任务

当然,如果你真的需要,你也可以让它更复杂,但这很可能会起作用,因为通常 1 个任务会触发一个事件,但许多任务想知道它是否完成,这就是它所提供的。

但您的基本流程如下:

init()
{
    event->createEvent(3);
}

eventHandler()
{
    locker l(mutex);
    doEventProcessing();
    signal(event);
}

taskA()
{
    doOperationThatTriggersAnEvent();
    wait_for(mutex);
    eventComplete();
}

taskB()
{
    doWhateverIWant();
    // now I need to know if the event has occurred...
    wait(event);
    coolNowIKnowThatIsDone();
}

taskC()
{
    taskCIsFun();
    wait(event);
    printf("event done!\n");
}

当我写上面的内容时,我觉得所有的 OO 概念都死了,但希望你能明白,实际上等待,wait_for应该采用相同的参数,或者没有参数,而是成为同一类的成员,该类也拥有他们需要知道的所有数据......但最重要的是它如何工作的概述。

评论

0赞 Chris Desjardins 10/9/2013
代码原样不适用于多个服务员,但更新它以使其适用于多个服务员是微不足道的,可能最简单的方法是将 msgQ 切换为计数 sem。我会更新答案。
0赞 sbi 10/9/2013
我想知道你是否找到一种方法来让计数信号量做到这一点。我们试过了,结果很干。但是,我们认为我们通过将消息队列的想法与信号量相结合找到了解决方案。我刚刚检查了消息队列在这个平台上是否可用(它们是另一个必须显式编译的功能),我现在将尝试实现这个想法。
0赞 Chris Desjardins 10/9/2013
事实上,我也考虑过计数 sem 的 msgQ,但我不喜欢依赖链。这消除了中间人 msgQ,并且只直接通知所有等待的任务。
0赞 sbi 10/9/2013
这两种假设都不成立。(想象一个具有动态使用者数量的经典生产者-消费者场景。我们有。可悲的是,我们发现将信号量与消息队列相结合的想法也行不通。
0赞 Chris Desjardins 10/9/2013
事实上,我给出了一些关于如何使其更具动态性的线索,可以使用这个基本算法来完成 IFF 在解锁解锁互斥锁之前就知道消费者的数量。
3赞 didymos_c 10/9/2013 #3

如果每个等待任务都在单独的二进制信号量上等待,则可以避免争用条件。 这些信号量必须注册到一个容器中,信令任务使用该容器来取消阻止所有等待任务。容器必须受互斥锁保护。

该方法获取二进制信号量,等待它,最后删除它。wait_for()

void condition::wait_for(mutex& mutex) {
    SEM_ID sem = semBCreate(SEM_Q_PRIORITY, SEM_EMPTY);
    {
        lock l(listeners_mutex);    // assure exclusive access to listeners container
        listeners.push_back(sem);       
    }                               // l's dtor unlocks listeners_mutex again

    unlocker ul(mutex);             // relinquish mutex
    semTake(sem, WAIT_FOREVER);

    {
        lock l(listeners_mutex);
        // remove sem from listeners
        // ...
        semDelete(sem);
    }
}                                   // ul's dtor grabs mutex again

该方法遍历所有已注册的信号量并解锁它们。signal()

void condition::signal() {
    lock l(listeners_mutex);
    for_each (listeners.begin(), listeners.end(), /* call semGive()... */ )
}

这种方法确保永远不会错过任何信号。缺点是需要额外的系统资源。 为了避免为每个调用创建和销毁信号量,可以使用池。wait_for()wait_for()

评论

0赞 Chris Desjardins 10/10/2013
这里的问题是 dtor 被多次调用,每个 wait_for() 调用一次,除了第一个离开 wait_for 的 dtor 之外,其他所有 dtor 都会在互斥锁上阻塞......否则,它与我的解决方案非常接近,如果您认为二进制 sems 列表大致相当于计数 sem,您还需要在解锁器解锁之前知道侦听器的数量。除本例外,它不是硬编码的。
0赞 didymos_c 10/10/2013
@Chris 关于 dtor 中的块,我同意。这是从原始代码示例中复制而来的。显然,不应该获取互斥锁的任务必须传递虚拟互斥锁或调用没有互斥锁参数和解锁器的wait_for。据我了解,侦听器的数量应该是灵活的,这意味着不与信号器共享互斥锁的侦听器会错过旧信号。计算信号量将允许以下场景:任务 A 增加 n 并解锁互斥锁,信号发送者任务 B 抢占 A 调用 n 次 semGive,任务 C 增加 n 并调用 semTake,最后任务 A 恢复并在 semTake 上阻塞。