提问人:JayC667 提问时间:7/30/2023 最后编辑:JayC667 更新时间:7/30/2023 访问量:96
这种实现是否正确?
Is this implementation correct?
问:
我想知道这个实现是否正确。
方案是:
- 一名监督
- 未知数量的工人 (0-n)
- 监督者只需要向工人传达或发出信号
resume work
production has stopped
- 工作人员自己处理对数据的检查,所以即使是一些没有任何新数据的冗余信号也是可以接受的(所以这基本上只是为了将繁忙的等待减少到最低限度,不能保证 100% 避免它)
resume work
- 工人也不会在数据上竞争。数据不会被消耗,而只是存在更多可以处理的数据。(如果有消费,我会使用信号量或 BlockingQueues)
- 任何工人都没有必要接收到每个信号(甚至一个信号):如果他们错过了一个信号,下一个信号就会出现。即使他们错过了所有,唯一必须发生的事情就是所有人都接收到来自的信号。
resume work
production has stopped
.signalEnd()
使用 ,这也应该防止错过 : ,以防 ing 线程在 ing 线程之前运行。signalEnd()
Thread.notify()
notify
wait
法典:
package jc.lib.lang.thread;
/**
* Worker Threads wait for signals from Overseer Threads.<br>
* <ul>
* <li>Workers wait for data ({@link #awaitProgress(long, int)}), work, wait again for more data. Can re-enlist multiple times.</li>
* <li>Overseers send {@link #signalProgress()} if they made more data available.</li>
* <li>Overseers signal end ({@link #signalEnd()}) to indicate that no more data will be available.</li>
* </ul>
*
* @author jc
* @since 2023-07-20
*/
public final class JcCyclicLatch {
private final Object mSyncObject = new Object();
private volatile boolean mWorkEnded = false;
public JcCyclicLatch() {}
/**
* Worker Threads wait for Overseer Thread to signal new data.
* @return <b>true</b> if new data is present (continue working).<br> <b>false</b> if work has been completed (stop working).
* @see {@linkplain Object#wait(long, int)}
*/
public boolean awaitProgress(final long pTimeoutMs, final int pTimeoutNs) {
if (mWorkEnded) return false;
synchronized (mSyncObject) {
try {
if (mWorkEnded) return false;
mSyncObject.wait(pTimeoutMs, pTimeoutNs);
} catch (final InterruptedException e) { /* cannot happen, we ourselves control the sync object */ }
}
return !mWorkEnded;
}
/**
* @see #awaitProgress(long, int)
*/
public boolean awaitProgress(final long pTimeoutMs) {
return awaitProgress(pTimeoutMs, 0);
}
/**
* @see #awaitProgress(long, int)
*/
public boolean awaitProgress() {
return awaitProgress(0, 0);
}
/**
* Overseer Thread signals availability of new data.
* @throws InterruptedException
*/
public void signalProgress() {
synchronized (mSyncObject) {
mSyncObject.notifyAll();
}
}
/**
* Break open lock so that all registered Workers will pass. Future Workers will also pass rigth through.
*/
public void signalEnd() {
mWorkEnded = true;
signalProgress();
}
/**
* Resets to inital status.
*/
public void reset() {
mWorkEnded = false;
}
/*
* Example method
*/
public static void main(final String[] args) {
final JcCyclicLatch l = new JcCyclicLatch();
// start workerss
for (int i = 0; i < 3; i++) {
final int index = i;
final Thread workerThread = new Thread(() -> {
System.out.println("C " + index + " started.");
while (true) {
System.out.println("C " + index + " waiting...");
final boolean continueWorking = l.awaitProgress();
// we could put an early if(!continueWork) break here, if we do not need to process all data
// ... work stuff
System.out.println("C " + index + " working.");
// ... work stuff
if (!continueWorking) { // here, we have processed all (available) data
System.out.println("C " + index + " is done working.");
break;
}
}
System.out.println("C " + index + " ending.");
});
workerThread.start();
}
// this is the overseer
for (int i = 0; i < 3; i++) {
System.out.println("P working...");
try {
Thread.sleep(1000); // simulates work
} catch (final InterruptedException e) { /* */ }
System.out.println("P has new data.");
l.signalProgress();
}
System.out.println("P is ending...");
l.signalEnd();
System.out.println("P finished.");
}
}
如果只有 1 个监督者和 1 个工作人员参与,我会求助于一个包含数据包的包,最后是一个特殊的包来发出信号。LinkedBlockingQueue
production has stopped
因此,我的实际问题是,该实现是否正确和/或应该/可以在以下方面进一步改进:
- 就而言,它是否防止锁定(尽可能使用 After )?
obj.wait()
obj.notify()
.signalEnd()
- 它能减少繁忙的等待吗?
其他问题,在这里收集意见,在答案中考虑或评论:
- 由于这是基本场景,是否有任何默认的 java 类可以在同等的小代码复杂度下完成此操作?
- 我不确定这个名字是否合适。在所有的 es 和 s 和 s 和 s 和 s 之间,我感到非常困惑。你会给它起什么名字?
CyclicLatch
Latch
Lock
Barrier
Sempahore
Phaser
答:
1赞
Matt Timmermans
7/30/2023
#1
在这种情况下,此代码看起来没有执行您想要的操作:
- 工人开始工作......
- 监督者召唤
signalProgress
- 工作人员完成和调用
awaitProgress
在这一点上,我相信你希望工人在工作,但事实并非如此,因为它错过了信号。不应尝试用于传达状态信息。仅用于在状态更改时通知服务员。notify...
你真的需要有一个或类似的东西来保护.boolean workersEnabled;
mSyncObject
评论
0赞
JayC667
7/30/2023
感谢输入,我更新了问题:没有必要让所有工人都收到每个恢复工作信号:如果他们错过了一个,下一个就会出现。唯一必须发生的事情是生产已停止
,来自 .signalEnd() 的信号被所有人接收。比如说,如果我必须发出信号,我会遇到另一个“软”问题:在所有工人开始之后,但在任何一个工人完成工作之前,在生产者中重置。boolean workersEnabled;
workersEnabled=false
0赞
Matt Timmermans
7/30/2023
如果所有工人都错过了信号怎么办?
0赞
Matt Timmermans
7/30/2023
由于您提供了所有代码,因此我将添加一个修复程序。当你编写并发代码时,你必须努力使它很容易证明它有效,因为任何阅读该代码的人都需要在他们的脑海中做这个证明。
0赞
JayC667
7/30/2023
任何监督者的最终信号将是 ,所有工人都必须接收。预计会经常出现,例如从套接字读取或等待另一个进程的输出。如果这对工人来说太快了,他们将(必须)收到..signalEnd()
.signalProcess()
.signalEnd()
1赞
Matt Timmermans
7/30/2023
重新发出信号...如果所有工人都错过了最后一个进度信号怎么办?这难道不意味着有些工作无法完成吗?
评论
wait
mSyncObject
.notify()
.awaitProgress()
Overseer
Worker
main()
wait
wait
wait
notifyAll