这种实现是否正确?

Is this implementation correct?

提问人:JayC667 提问时间:7/30/2023 最后编辑:JayC667 更新时间:7/30/2023 访问量:96

问:

我想知道这个实现是否正确。

方案是:

  • 一名监督
  • 未知数量的工人 (0-n)
  • 监督者只需要向工人传达或发出信号resume workproduction has stopped
  • 工作人员自己处理对数据的检查,所以即使是一些没有任何新数据的冗余信号也是可以接受的(所以这基本上只是为了将繁忙的等待减少到最低限度,不能保证 100% 避免它)resume work
  • 工人也不会在数据上竞争。数据不会被消耗,而只是存在更多可以处理的数据。(如果有消费,我会使用信号量或 BlockingQueues)
  • 任何工人都没有必要接收到每个信号(甚至一个信号):如果他们错过了一个信号,下一个信号就会出现。即使他们错过了所有,唯一必须发生的事情就是所有人都接收到来自的信号。resume workproduction has stopped.signalEnd()

使用 ,这也应该防止错过 : ,以防 ing 线程在 ing 线程之前运行。signalEnd()Thread.notify()notifywait

法典:

package jc.lib.lang.thread;

/**
 * Worker Threads wait for signals from Overseer Threads.<br>
 * <ul>
 * <li>Workers wait for data ({@link #awaitProgress(long, int)}), work, wait again for more data. Can re-enlist multiple times.</li>
 * <li>Overseers send {@link #signalProgress()} if they made more data available.</li>
 * <li>Overseers signal end ({@link #signalEnd()}) to indicate that no more data will be available.</li>
 * </ul>
 *
 * @author jc
 * @since 2023-07-20
 */
public final class JcCyclicLatch {

    private final Object mSyncObject = new Object();

    private volatile boolean mWorkEnded = false;

    public JcCyclicLatch() {}



    /**
     * Worker Threads wait for Overseer Thread to signal new data.
     * @return <b>true</b> if new data is present (continue working).<br> <b>false</b> if work has been completed (stop working).
     * @see {@linkplain Object#wait(long, int)}
     */
    public boolean awaitProgress(final long pTimeoutMs, final int pTimeoutNs) {
        if (mWorkEnded) return false;

        synchronized (mSyncObject) {
            try {
                if (mWorkEnded) return false;
                mSyncObject.wait(pTimeoutMs, pTimeoutNs);
            } catch (final InterruptedException e) { /* cannot happen, we ourselves control the sync object */ }
        }
        return !mWorkEnded;
    }
    /**
     * @see #awaitProgress(long, int)
     */
    public boolean awaitProgress(final long pTimeoutMs) {
        return awaitProgress(pTimeoutMs, 0);
    }
    /**
     * @see #awaitProgress(long, int)
     */
    public boolean awaitProgress() {
        return awaitProgress(0, 0);
    }

    /**
     * Overseer Thread signals availability of new data.
     * @throws InterruptedException
     */
    public void signalProgress() {
        synchronized (mSyncObject) {
            mSyncObject.notifyAll();
        }
    }

    /**
     * Break open lock so that all registered Workers will pass. Future Workers will also pass rigth through.
     */
    public void signalEnd() {
        mWorkEnded = true;
        signalProgress();
    }

    /**
     * Resets to inital status.
     */
    public void reset() {
        mWorkEnded = false;
    }



    /*
     * Example method
     */
    public static void main(final String[] args) {
        final JcCyclicLatch l = new JcCyclicLatch();

        // start workerss
        for (int i = 0; i < 3; i++) {
            final int index = i;
            final Thread workerThread = new Thread(() -> {
                System.out.println("C " + index + " started.");
                while (true) {
                    System.out.println("C " + index + " waiting...");
                    final boolean continueWorking = l.awaitProgress();

                    // we could put an early if(!continueWork) break here, if we do not need to process all data

                    // ... work stuff
                    System.out.println("C " + index + " working.");
                    // ... work stuff

                    if (!continueWorking) { // here, we have processed all (available) data
                        System.out.println("C " + index + " is done working.");
                        break;
                    }
                }
                System.out.println("C " + index + " ending.");
            });
            workerThread.start();
        }

        // this is the overseer
        for (int i = 0; i < 3; i++) {
            System.out.println("P working...");
            try {
                Thread.sleep(1000); // simulates work
            } catch (final InterruptedException e) { /* */ }

            System.out.println("P has new data.");
            l.signalProgress();
        }
        System.out.println("P is ending...");
        l.signalEnd();
        System.out.println("P finished.");
    }

}

如果只有 1 个监督者和 1 个工作人员参与,我会求助于一个包含数据包的包,最后是一个特殊的包来发出信号。LinkedBlockingQueueproduction has stopped

因此,我的实际问题是,该实现是否正确和/或应该/可以在以下方面进一步改进

  • 就而言,它是否防止锁定(尽可能使用 After )?obj.wait()obj.notify().signalEnd()
  • 它能减少繁忙的等待吗?

其他问题,在这里收集意见,在答案中考虑或评论:

  1. 由于这是基本场景,是否有任何默认的 java 类可以在同等的小代码复杂度下完成此操作?
  2. 我不确定这个名字是否合适。在所有的 es 和 s 和 s 和 s 和 s 之间,我感到非常困惑。你会给它起什么名字?CyclicLatchLatchLockBarrierSempahorePhaser
Java 多线程 并发 死锁 同步

评论

0赞 Solomon Slow 7/30/2023
没有深入研究,但你的“消费者”无条件地打电话,而不是在循环中,所以可能不行。 docs.oracle.com/javase/tutorial/essential/concurrency/......wait
1赞 Solomon Slow 7/30/2023
回复,“消费者也不会在数据上竞争。数据不会被消耗。听起来“生产者/消费者”可能不是您的算法的正确名称。
0赞 JayC667 7/30/2023
@SolomonSlow Thanx 的答案。对于我的需求来说,这个循环不是必需的,因为a)我控制自己(不是公共的),所以不太可能发出错误的信号。再说一次,c)如果有人使用反射来访问我的同步对象,它只会让“消费者”经常检查状态一次。在这种情况下,他们会简单地检查他们的输入,决定无事可做,然后再次跳入。你是对的 “生产者”/“消费者”的事情。我会将它们重命名为 和 。mSyncObject.notify().awaitProgress()OverseerWorker
3赞 Solomon Slow 7/30/2023
你是对的。我没有深入看。我只看了实例方法,我以为这只是“测试代码”。因此,您的消费者确实会调用一个循环,即使循环和恰好采用不同的方法。此外,似乎(但我仍然没有深入研究)消费者在最后一次发生之后没有办法。在代码审查的情况下,我不会给你的代码一个“通过”有几个原因,但它们都是关于漏洞的,而不是关于正确性的。main()waitwaitwaitnotifyAll

答:

1赞 Matt Timmermans 7/30/2023 #1

在这种情况下,此代码看起来没有执行您想要的操作:

  1. 工人开始工作......
  2. 监督者召唤signalProgress
  3. 工作人员完成和调用awaitProgress

在这一点上,我相信你希望工人在工作,但事实并非如此,因为它错过了信号。不应尝试用于传达状态信息。仅用于在状态更改时通知服务员。notify...

你真的需要有一个或类似的东西来保护.boolean workersEnabled;mSyncObject

评论

0赞 JayC667 7/30/2023
感谢输入,我更新了问题:没有必要让所有工人都收到每个恢复工作信号:如果他们错过了一个,下一个就会出现。唯一必须发生的事情是生产已停止,来自 .signalEnd() 的信号被所有人接收。比如说,如果我必须发出信号,我会遇到另一个“软”问题:在所有工人开始之后,但在任何一个工人完成工作之前,在生产者中重置。boolean workersEnabled;workersEnabled=false
0赞 Matt Timmermans 7/30/2023
如果所有工人都错过了信号怎么办?
0赞 Matt Timmermans 7/30/2023
由于您提供了所有代码,因此我将添加一个修复程序。当你编写并发代码时,你必须努力使它很容易证明它有效,因为任何阅读该代码的人都需要在他们的脑海中做这个证明。
0赞 JayC667 7/30/2023
任何监督者的最终信号将是 ,所有工人都必须接收。预计会经常出现,例如从套接字读取或等待另一个进程的输出。如果这对工人来说太快了,他们将(必须)收到..signalEnd().signalProcess().signalEnd()
1赞 Matt Timmermans 7/30/2023
重新发出信号...如果所有工人都错过了最后一个进度信号怎么办?这难道不意味着有些工作无法完成吗?