提问人:kaqqao 提问时间:10/26/2016 最后编辑:kaqqao 更新时间:11/28/2022 访问量:10337
如何使用 ExecutorService 轮询,直到结果到达
How to use ExecutorService to poll until a result arrives
问:
我有一个场景,我必须轮询远程服务器,检查任务是否已完成。一旦它完成,我就会发出不同的调用来检索结果。
我最初认为我应该使用 with 进行轮询:SingleThreadScheduledExecutor
scheduleWithFixedDelay
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId), 0, 10, TimeUnit.SECONDS);
public void poll(String jobId) {
boolean jobDone = remoteServer.isJobDone(jobId);
if (jobDone) {
retrieveJobResult(jobId);
}
}
但是由于我只能提供一个不能返回任何东西的东西,我不明白什么时候会完成,如果有的话。calling even 是什么意思?我还在等什么结果?Runnable
scheduleWithFixedDelay
future
future.get()
第一次检测到远程任务已完成时,我想执行不同的远程调用并将其结果设置为 的值。我想我可以用它来解决这个问题,我会转发到我的方法,而我的方法又会转发给我的方法,最终完成它:future
CompletableFuture
poll
retrieveTask
CompletableFuture<Object> result = new CompletableFuture<Object>();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId, result), 0, 10, TimeUnit.SECONDS);
public void poll(String jobId, CompletableFuture<Object> result) {
boolean jobDone = remoteServer.isJobDone(jobId);
if (jobDone) {
retrieveJobResult(jobId, result);
}
}
public void retrieveJobResult(String jobId, CompletableFuture<Object> result) {
Object remoteResult = remoteServer.getJobResult(jobId);
result.complete(remoteResult);
}
但这有很多问题。首先,它甚至似乎不是为这种用途而设计的。相反,我认为我应该这样做,但是当我被取消/完成时,我将如何正确关闭并取消它返回的?感觉轮询应该以某种完全不同的方式实现。CompletableFuture
CompletableFuture.supplyAsync(() -> poll(jobId))
executor
future
CompletableFuture
答:
我认为 CompletableFutures 是一个很好的方法:
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private void run() {
final Object jobResult = pollForCompletion("jobId1")
.thenApply(jobId -> remoteServer.getJobResult(jobId))
.get();
}
private CompletableFuture<String> pollForCompletion(final String jobId) {
CompletableFuture<String> completionFuture = new CompletableFuture<>();
final ScheduledFuture<Void> checkFuture = executor.scheduleAtFixedRate(() -> {
if (remoteServer.isJobDone(jobId)) {
completionFuture.complete(jobId);
}
}, 0, 10, TimeUnit.SECONDS);
completionFuture.whenComplete((result, thrown) -> {
checkFuture.cancel(true);
});
return completionFuture;
}
评论
whenComplete
thrown instanceof CancelationException
thrown.getCause() instanceof CancellationException
pollForCompletion
whenComplete
handle
whenComplete
whenComplete
ScheduledFuture
completeExceptionally
在我看来,你比其他任何风格问题都更担心一些风格问题。在 Java 8 中,有 2 个角色:一个是传统的 future,它为任务执行和状态查询提供异步源;另一个是我们通常所说的承诺。一个承诺,如果你还不知道,可以被认为是未来的建设者及其完成的来源。因此,在这种情况下,直观地需要一个承诺,这正是您在这里使用的确切情况。你所担心的例子是向你介绍第一个用法的东西,而不是承诺的方式。CompletableFuture
接受这一点,您应该更容易开始处理您的实际问题。我认为 promise 应该有 2 个角色,一个是通知你的任务完成轮询,另一个是在完成时取消你的预定任务。这里应该是最终的解决方案:
public CompletableFuture<Object> pollTask(int jobId) {
CompletableFuture<Object> fut = new CompletableFuture<>();
ScheduledFuture<?> sfuture = executor.scheduleWithFixedDelay(() -> _poll(jobId, fut), 0, 10, TimeUnit.SECONDS);
fut.thenAccept(ignore -> sfuture.cancel(false));
return fut;
}
private void _poll(int jobId, CompletableFuture<Object> fut) {
// whatever polls
if (isDone) {
fut.complete(yourResult);
}
}
评论
Future
我为此创建了一个通用实用程序,灵感来自这个答案,每个轮询都可以返回,直到值准备就绪。我还实现了 a,以便在超过最大时间时抛出 a。Supplier<Optional<T>>
Optional.empty()
timeout
TimeoutException
用法:
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
Supplier<Optional<String>> supplier = () -> remoteServer.isJobDone(jobId) ? Optional.of(jobId) : Optional.empty();
CompletableFuture<String> future = ScheduledCompletableFuture.builder(String.class)
.supplier(supplier)
.executorService(scheduledExecutor)
.timeUnit(TimeUnit.SECONDS)
.initialDelay(5)
.period(5)
.timeout(60 * 5)
.build();
ScheduledCompletableFuture.java
public class ScheduledCompletableFuture {
public static class ScheduledCompletableFutureBuilder<T> {
private Supplier<Optional<T>> supplier;
private ScheduledExecutorService executorService;
private Long initialDelay;
private Long period;
private Long timeout;
private TimeUnit timeUnit;
public ScheduledCompletableFutureBuilder() {
}
public ScheduledCompletableFutureBuilder<T> supplier(Supplier<Optional<T>> supplier) {
this.supplier = supplier;
return this;
}
public ScheduledCompletableFutureBuilder<T> executorService(ScheduledExecutorService executorService) {
this.executorService = executorService;
return this;
}
public ScheduledCompletableFutureBuilder<T> initialDelay(long initialDelay) {
this.initialDelay = initialDelay;
return this;
}
public ScheduledCompletableFutureBuilder<T> period(long period) {
this.period = period;
return this;
}
public ScheduledCompletableFutureBuilder<T> timeout(long timeout) {
this.timeout = timeout;
return this;
}
public ScheduledCompletableFutureBuilder<T> timeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
return this;
}
public CompletableFuture<T> build() {
// take a copy of instance variables so that the Builder can be re-used
Supplier<Optional<T>> supplier = this.supplier;
ScheduledExecutorService executorService = this.executorService;
Long initialDelay = this.initialDelay;
Long period = this.period;
Long timeout = this.timeout;
TimeUnit timeUnit = this.timeUnit;
CompletableFuture<T> completableFuture = new CompletableFuture<>();
long endMillis = System.currentTimeMillis() + timeUnit.toMillis(timeout);
Runnable command = () -> {
Optional<T> optional = supplier.get();
if (optional.isPresent()) {
completableFuture.complete(optional.get());
} else if (System.currentTimeMillis() > endMillis) {
String msg = String.format("Supplier did not return a value within %s %s", timeout, timeUnit);
completableFuture.completeExceptionally(new TimeoutException(msg));
}
};
ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(command, initialDelay, period, timeUnit);
return completableFuture.whenComplete((result, exception) -> scheduledFuture.cancel(true));
}
}
public static <T> ScheduledCompletableFutureBuilder<T> builder(Class<T> type) {
return new ScheduledCompletableFutureBuilder<>();
}
}
评论
scheduleWithFixedDelay
Callable
CompletableFuture
supplyAsync
CompletableFuture
CompletableFuture
complete
completeExceptionally
cancel
ScheduledFuture