如何使用 ExecutorService 轮询,直到结果到达

How to use ExecutorService to poll until a result arrives

提问人:kaqqao 提问时间:10/26/2016 最后编辑:kaqqao 更新时间:11/28/2022 访问量:10337

问:

我有一个场景,我必须轮询远程服务器,检查任务是否已完成。一旦它完成,我就会发出不同的调用来检索结果。

我最初认为我应该使用 with 进行轮询:SingleThreadScheduledExecutorscheduleWithFixedDelay

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId);
   }
}

但是由于我只能提供一个不能返回任何东西的东西,我不明白什么时候会完成,如果有的话。calling even 是什么意思?我还在等什么结果?RunnablescheduleWithFixedDelayfuturefuture.get()

第一次检测到远程任务已完成时,我想执行不同的远程调用并将其结果设置为 的值。我想我可以用它来解决这个问题,我会转发到我的方法,而我的方法又会转发给我的方法,最终完成它:futureCompletableFuturepollretrieveTask

CompletableFuture<Object> result = new CompletableFuture<Object>();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId, result), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId, CompletableFuture<Object> result) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId, result);
   }
}

public void retrieveJobResult(String jobId, CompletableFuture<Object> result) {
    Object remoteResult = remoteServer.getJobResult(jobId);
    result.complete(remoteResult);
}

但这有很多问题。首先,它甚至似乎不是为这种用途而设计的。相反,我认为我应该这样做,但是当我被取消/完成时,我将如何正确关闭并取消它返回的?感觉轮询应该以某种完全不同的方式实现。CompletableFutureCompletableFuture.supplyAsync(() -> poll(jobId))executorfutureCompletableFuture

java 并发 executorservice completable-future

评论

1赞 Thilo 10/26/2016
您还可以提交可调用对象(返回结果):docs.oracle.com/javase/7/docs/api/java/util/concurrent/...
2赞 kaqqao 10/26/2016
@Thilo 仅适用于一次性任务,不适用于 scheduleWithFixedDelay 或 scheduleAtFixedRate,因此轮询已结束
0赞 Jason Hu 10/26/2016
@Thilo我认为从来没有收到过.scheduleWithFixedDelayCallable
0赞 Jason Hu 10/26/2016
Op,我认为你在做正确的事情。 实际上是异步编程框架中的一个承诺。然而,你应该暴露一个不完整的正常未来。并且您的所有后续代码都应该订阅该未来。我看不出有什么问题。是什么让你感到困惑?CompletableFuture
0赞 kaqqao 10/26/2016
@HuStmpHrrr 所有示例似乎都在做,而不是显式创建一个 .但更重要的是,就我而言,我需要在未来完成时关闭执行器。我应该子类化和覆盖,并这样做吗?我应该担心取消我从遗嘱执行人那里得到的吗?supplyAsyncCompletableFutureCompletableFuturecompletecompleteExceptionallycancelScheduledFuture

答:

17赞 Andrew Rueckert 10/26/2016 #1

我认为 CompletableFutures 是一个很好的方法:

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

private void run() {
    final Object jobResult = pollForCompletion("jobId1")
            .thenApply(jobId -> remoteServer.getJobResult(jobId))
            .get();

}

private CompletableFuture<String> pollForCompletion(final String jobId) {
    CompletableFuture<String> completionFuture = new CompletableFuture<>();
    final ScheduledFuture<Void> checkFuture = executor.scheduleAtFixedRate(() -> {
        if (remoteServer.isJobDone(jobId)) {
            completionFuture.complete(jobId);
        }
    }, 0, 10, TimeUnit.SECONDS);
    completionFuture.whenComplete((result, thrown) -> {
        checkFuture.cancel(true);
    });
    return completionFuture;
}

评论

0赞 kaqqao 10/26/2016
谢谢!我终于意识到应该如何组合多个步骤。我一直很想念这个。现在就去试试吧。
0赞 kaqqao 10/26/2016
如果未来被取消,检查内部的正确方法是什么?我只能想出whenCompletethrown instanceof CancelationException
0赞 Andrew Rueckert 10/26/2016
我不记得我的头顶;CancellationException 可以包装在 ExecutionException 中。(所以你需要检查。但是,在我的示例代码中,没有任何内容会导致 CompletableFuture 被取消。你能用你的新代码更新你的问题吗?thrown.getCause() instanceof CancellationExceptionpollForCompletion
1赞 George Pligoropoulos 2/26/2020
的用法似乎不正确,因为可完成的 future 可能会引发异常或超时等。因此,取消调度程序似乎是更合适的方法whenCompletehandle
1赞 danidiaz 12/7/2020
@GeorgePligoropoulos我相信在这种情况下效果很好。无论原始阶段以结果结束还是以异常结束,都会执行该操作,并重新引发异常。我认为问题在于我们没有在作业中捕获异常,以便使用捕获的异常进行调用。whenCompletewhenCompleteScheduledFuturecompleteExceptionally
2赞 Jason Hu 10/26/2016 #2

在我看来,你比其他任何风格问题都更担心一些风格问题。在 Java 8 中,有 2 个角色:一个是传统的 future,它为任务执行和状态查询提供异步源;另一个是我们通常所说的承诺。一个承诺,如果你还不知道,可以被认为是未来的建设者及其完成的来源。因此,在这种情况下,直观地需要一个承诺,这正是您在这里使用的确切情况。你所担心的例子是向你介绍第一个用法的东西,而不是承诺的方式。CompletableFuture

接受这一点,您应该更容易开始处理您的实际问题。我认为 promise 应该有 2 个角色,一个是通知你的任务完成轮询,另一个是在完成时取消你的预定任务。这里应该是最终的解决方案:

public CompletableFuture<Object> pollTask(int jobId) {
    CompletableFuture<Object> fut = new CompletableFuture<>();
    ScheduledFuture<?> sfuture = executor.scheduleWithFixedDelay(() -> _poll(jobId, fut), 0, 10, TimeUnit.SECONDS);
    fut.thenAccept(ignore -> sfuture.cancel(false));
    return fut;
}

private void _poll(int jobId, CompletableFuture<Object> fut) {
    // whatever polls
    if (isDone) {
        fut.complete(yourResult);
    }
}

评论

0赞 kaqqao 10/26/2016
啊,我现在意识到应该如何将步骤与 s 结合起来。到目前为止,我还没有做到这一点。谢谢,这很有用。我现在去试试。Future
0赞 kaqqao 10/26/2016
此代码仍然存在问题,但与此问题不同。如果您仍然有兴趣帮助我,请查看新问题。再次感谢。
3赞 lance-java 3/25/2021 #3

我为此创建了一个通用实用程序,灵感来自这个答案,每个轮询都可以返回,直到值准备就绪。我还实现了 a,以便在超过最大时间时抛出 a。Supplier<Optional<T>>Optional.empty()timeoutTimeoutException

用法:

ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
Supplier<Optional<String>> supplier = () -> remoteServer.isJobDone(jobId) ? Optional.of(jobId) : Optional.empty();
CompletableFuture<String> future = ScheduledCompletableFuture.builder(String.class)
   .supplier(supplier)
   .executorService(scheduledExecutor)
   .timeUnit(TimeUnit.SECONDS)
   .initialDelay(5)
   .period(5)
   .timeout(60 * 5)
   .build();

ScheduledCompletableFuture.java

public class ScheduledCompletableFuture {
    public static class ScheduledCompletableFutureBuilder<T> {
        private Supplier<Optional<T>> supplier;
        private ScheduledExecutorService executorService;
        private Long initialDelay;
        private Long period;
        private Long timeout;
        private TimeUnit timeUnit;

        public ScheduledCompletableFutureBuilder() {
        }

        public ScheduledCompletableFutureBuilder<T> supplier(Supplier<Optional<T>> supplier) {
            this.supplier = supplier;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> executorService(ScheduledExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> initialDelay(long initialDelay) {
            this.initialDelay = initialDelay;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> period(long period) {
            this.period = period;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> timeout(long timeout) {
            this.timeout = timeout;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> timeUnit(TimeUnit timeUnit) {
            this.timeUnit = timeUnit;
            return this;
        }

        public CompletableFuture<T> build() {
            // take a copy of instance variables so that the Builder can be re-used
            Supplier<Optional<T>> supplier = this.supplier;
            ScheduledExecutorService executorService = this.executorService;
            Long initialDelay = this.initialDelay;
            Long period = this.period;
            Long timeout = this.timeout;
            TimeUnit timeUnit = this.timeUnit;

            CompletableFuture<T> completableFuture = new CompletableFuture<>();
            long endMillis = System.currentTimeMillis() + timeUnit.toMillis(timeout);
            Runnable command = () -> {
                Optional<T> optional = supplier.get();
                if (optional.isPresent()) {
                    completableFuture.complete(optional.get());
                } else if (System.currentTimeMillis() > endMillis) {
                    String msg = String.format("Supplier did not return a value within %s %s", timeout, timeUnit);
                    completableFuture.completeExceptionally(new TimeoutException(msg));
                }
            };
            ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(command, initialDelay, period, timeUnit);
            return completableFuture.whenComplete((result, exception) -> scheduledFuture.cancel(true));
        }
    }

    public static <T> ScheduledCompletableFutureBuilder<T> builder(Class<T> type) {
        return new ScheduledCompletableFutureBuilder<>();
    }
}