Java Flux 中的延迟增加

Increasing delays in Java Flux

提问人:Praveen Kumar 提问时间:11/4/2023 更新时间:11/9/2023 访问量:274

问:

我有一个代码,我可以在循环中执行一段逻辑,中间使用 Flux 有一些延迟。像这样的东西,

Flux.defer(() -> service.doSomething())
            .repeatWhen(v -> Flux.interval(Duration.ofSeconds(10)))
            .map(data -> mapper(data)) //map data
            .takeUntil(v -> shouldContinue(v)) //checks if the loop can be terminated
            .onErrorStop();

现在,我想要增量延迟。这意味着,在前 5 分钟内,每次执行之间的延迟可能为 10 秒。然后,在接下来的 10 分钟内,延迟可以是 30 秒。此后,每次执行之间的延迟可能为一分钟。

如何使用 Flux 实现这一点?

提前致谢。

Java Spring-WebFlux 反应式编程 延迟 通量

评论

0赞 phuongnq1995 11/6/2023
Meaning, for the first 5 minutes, the delay can be 10 seconds between each execution.如果我理解正确的话,前 5 分钟内有多次执行,它们都延迟了 10 秒。是吗?
0赞 Anish B. 11/6/2023
您想考虑 5 分钟的增量时间吗?还是只是延迟时间?
0赞 Anish B. 11/6/2023
助焊剂的完成时间是 5 分钟、10 分钟吗?

答:

3赞 VonC 11/6/2023 #1

您可以尝试使用 Flux.generate 创建一个自定义序列,该序列以所需的延迟发出项目。
另请参阅“Flux.createFlux.generate 之间的区别”。

Flux<Object> flux = Flux.generate(
    () -> 0L,
    (state, sink) -> {
        sink.next(state);
        return state + 1;
    }
)
.delayUntil(state -> {
    long delay = 10; // default delay
    if (state < 30) { // first 5 minutes with 10 seconds delay
        delay = 10;
    } else if (state < 50) { // next 10 minutes with 30 seconds delay
        delay = 30;
    } else { // thereafter, 1 minute delay
        delay = 60;
    }
    return Mono.delay(Duration.ofSeconds(delay));
})
.flatMap(state -> service.doSomething())
.map(data -> mapper(data))
.takeUntil(v -> shouldContinue(v))
.onErrorStop();

这将初始化状态,并在每次迭代时递增。
根据状态值(表示已用期间数),它会应用不同的延迟。
0

      ┌────────────┐         ┌─────────────────┐          ┌──────────────┐
      │   Flux     │         │ delayUntil with │          │  doSomething │
      │ generate   ├───────► │  custom delays  ├────────► │   Service    │
      └────────────┘         └─────────────────┘          └──────────────┘
            │                                                   │
            │                                                   │
            ▼                                                   ▼
      ┌────────────┐        ┌────────────────┐           ┌───────────────┐
      │   flatMap  │        │  takeUntil     │           │   onErrorStop │
      │  function  ├───────►│ shouldContinue ├─────────► │    method     │
      └────────────┘        └────────────────┘           └───────────────┘

请注意,在实际应用中,您可能需要考虑 的执行时间。如果需要大量时间,您可能需要相应地调整延迟。service.doSomething()


如果执行时间大于延迟时间,会发生什么情况?这可能会并行触发多次。doSomething()doSomething()

当执行时间长于延迟时,后续执行不会等待当前执行完成,这可能会导致并发执行。doSomething()

为了防止这种情况,您可以使用确保按顺序执行的组合,以及有状态映射,以根据上次执行的开始或完成时间计算延迟。
参见 “Project Reactor 中的 flatMap、flatMapSequentialconcatMap 有什么区别?
.concatMap()doSomething()"

另外,如果我想以 0 秒的延迟开始,然后慢慢增加延迟,我该如何实现?

然后,您可以实现您的功能,延迟为 0 秒,然后慢慢增加延迟:

AtomicLong lastExecutionStart = new AtomicLong(System.currentTimeMillis());

Flux<Object> flux = Flux.generate(
    () -> 0L,
    (state, sink) -> {
        sink.next(state);
        return state + 1;
    }
)
.concatMap(state -> {
    long delay;
    long currentTime = System.currentTimeMillis();
    long elapsedTime = currentTime - lastExecutionStart.get();
    long executionTime = 0; // replace with actual execution time if known

    // Calculate the delay based on elapsed time since the start of the last execution
    if (elapsedTime <= 5 * 60 * 1000) { // first 5 minutes
        delay = Math.max(0, (10 - executionTime) * 1000); // 10 seconds delay, adjust for execution time
    } else if (elapsedTime <= 15 * 60 * 1000) { // next 10 minutes
        delay = Math.max(0, (30 - executionTime) * 1000); // 30 seconds delay, adjust for execution time
    } else { // thereafter
        delay = Math.max(0, (60 - executionTime) * 1000); // 60 seconds delay, adjust for execution time
    }

    return Mono.delay(Duration.ofMillis(delay))
        .then(Mono.fromCallable(() -> {
            lastExecutionStart.set(System.currentTimeMillis());
            return "doSomething";
        }))
        .flatMap(o -> service.doSomething());
})
.map(data -> mapper(data))
.takeUntil(v -> shouldContinue(v))
.onErrorStop();

concatMap确保按顺序进行呼叫。该变量跟踪上次执行的开始时间。该变量用于计算下一次执行的延迟。doSomething()lastExecutionStart

Mono.delay() 与 then() 一起使用,以在开始下一次执行之前等待计算出的延迟。
延迟后,被调用并更新时间戳。
doSomething()lastExecutionStart

这种模式可确保:

  • doSomething()不是并发执行的。
  • 下次执行前的延迟考虑了实际执行时间。doSomething()
  • 初始延迟可以是 0,然后根据您提供的逻辑递增。
        ┌──────────────────────────────────────────────┐
        │                    Flux                      │
        └──────────────────────────────────────────────┘
                          │
            ┌─────────────┴─────────────┐
            │     Generate (state)      │
            └─────────────┬─────────────┘
                          │
            ┌─────────────▼─────────────┐
            │   Calculate Next Delay    │ <────┐
            └─────────────┬─────────────┘      │
                          │                    │
         ┌────────────────▼─────────────────┐  │
         │            Mono.delay            │  │
         └────────────────┬─────────────────┘  │
                          │                    │
         ┌────────────────▼──────────────────┐ │
         │  Set lastExecutionStart Timestamp │ │
         └────────────────┬──────────────────┘ │
                          │                    │
            ┌─────────────▼─────────────┐      │
            │       service.doSomething │      │
            └─────────────┬─────────────┘      │
                          │                    │
            ┌─────────────▼─────────────┐      │
            │            map            │      │
            └─────────────┬─────────────┘      │
                          │                    │
            ┌─────────────▼─────────────┐      │
            │         takeUntil         │      │
            └─────────────┬─────────────┘      │
                          │                    │
            ┌─────────────▼─────────────┐      │
            │         onErrorStop       │      │
            └─────────────┬─────────────┘      │
                          │                    │
                          └────────────────────┘

我注意到如果返回一个,我得到一个错误。
但如果被退回,它就可以正常工作。所以我不得不用一个代替.
dataProvider.fetchData()FluxCannot infer type argument(s) for <V> concatMap(Function<? super T,? extends Publisher<? extends V>>)MonoflatMapManyflatMap

但是有没有办法处理是否可以返回 a 或 a ?service.doSomething()FluxMono

我正在考虑将其编写为通用服务。因此,想看看这是否可以处理 类型的数据,甚至是 类型的数据。Flux<T> Mono<T>T

若要处理可以返回 、 或 的服务方法,需要在响应式管道中调整方法调用,以迎合这些不同的响应式类型。一种方法是使用 indeed, 来处理两者 和 因为可以转换为 ,但反之则不然。Flux<T>Mono<T>TflatMapManyMonoFluxMonoFlux

AtomicLong lastExecutionStart = new AtomicLong(System.currentTimeMillis());

Flux<Object> flux = Flux.generate(
    () -> 0L,
    (state, sink) -> {
        sink.next(state);
        return state + 1;
    }
)
.concatMap(state -> {
    long delay;
    long currentTime = System.currentTimeMillis();
    long elapsedTime = currentTime - lastExecutionStart.get();
    long executionTime = 0; // replace with actual execution time if known

    // Calculate the delay based on elapsed time since the start of the last execution
    if (elapsedTime <= 5 * 60 * 1000) { // first 5 minutes
        delay = Math.max(0, (10 - executionTime) * 1000); // 10 seconds delay, adjust for execution time
    } else if (elapsedTime <= 15 * 60 * 1000) { // next 10 minutes
        delay = Math.max(0, (30 - executionTime) * 1000); // 30 seconds delay, adjust for execution time
    } else { // thereafter
        delay = Math.max(0, (60 - executionTime) * 1000); // 60 seconds delay, adjust for execution time
    }

    return Mono.delay(Duration.ofMillis(delay))
        .then(Mono.fromCallable(() -> {
            lastExecutionStart.set(System.currentTimeMillis());
            return "doSomething";
        }))
        // Use flatMapMany to handle both Mono and Flux from doSomething()
        .flatMapMany(o -> Flux.from(service.doSomething()));
})
.map(data -> mapper(data))
.takeUntil(v -> shouldContinue(v))
.onErrorStop();

通过使用 ,可以确保服务调用可以返回 a 或 be a,这将转换为 .为了处理同步类型,您通常会将其包装起来,具体取决于您是想要单个结果还是结果流。flatMapManyFluxMonoFluxTMono.just()Flux.just()

要真正使您的服务泛型,您必须确保方法签名被适当地设计为返回一个 ,这两者都是 和 实现的。这将允许您在服务方法本身中抽象出这些类型之间的差异。doSomething()Publisher<T>MonoFlux

为了处理同步值 (),您需要在 service 方法中创建一个反应式包装器,如下所示:T

public Publisher<T> doSomething() {
    T result = ...; // Your synchronous logic here
    return Mono.just(result); // or Flux.just(result) if it makes sense to return a stream
}

这样,就可以在反应式管道中使用相同的方法来处理 中的所有类型的返回值。flatMapManydoSomething()

评论

0赞 Praveen Kumar 11/7/2023
感谢您提供此解决方案。很少有后续想法。如果执行时间大于延迟时间,会发生什么情况?这可能会并行触发多次。另外,如果我想以 0 秒的延迟开始,然后慢慢增加延迟,我该如何实现?doSomething()doSomething()
0赞 VonC 11/7/2023
@PraveenKumar 好问题。我已经编辑了答案以解决您的评论。
0赞 Praveen Kumar 11/8/2023
感谢您的更新,它非常适合我的要求。但是,我注意到如果返回 Flux,我会收到错误。但是如果 Mono 被退回,它就可以正常工作。所以不得不用 a 代替 .但是有没有办法处理是否可以返回 Flux 或 Mono ?dataProvider.fetchData()Cannot infer type argument(s) for <V> concatMap(Function<? super T,? extends Publisher<? extends V>>)flatMapManyflatMapservice.doSomething()
0赞 Praveen Kumar 11/8/2023
我正在考虑将其编写为通用服务。因此,想看看这是否可以处理 Flux<T> 、Mono<T> 甚至 T 类型的数据。
1赞 Praveen Kumar 11/9/2023
太好了,按预期工作。谢谢
2赞 Yevhenii Semenov 11/6/2023 #2

虽然 Project Reactor 是一个很棒的工具,但重要的是要记住,并非所有任务都需要通过 Project Reactor 解决。在我看来,有时使用纯 Java 可能不那么复杂,更适合,即使 Project Reactor 是你的应用程序堆栈中的主要工具。但由于我不知道你的真实情况,我将分享两个版本。

选项 1:更少的 Reactor,更纯的 Java。

public class ScheduledTask {
    private final Scheduler scheduler = Schedulers.newSingle("custom-scheduled-task");
    private final Instant startTime = Instant.now();

    {
        scheduler.schedule(this::task); // if you don't want delay on start 
        //scheduler.schedule(this::task, 10, TimeUnit.SECONDS); // if you want delay on start
    }

    private void task() {
        System.out.println("do something"); // you can inject your service to the class and execute needed method here

        if (shouldContinue()) {
            scheduler.schedule(this::task, calculateDelaySec(), TimeUnit.SECONDS);
        }
    }

    private boolean shouldContinue() {
        return true; // put your custom logic here
    }

    private long calculateDelaySec() {
        var minutesSpentSinceStart = Duration.between(startTime, Instant.now()).toMinutes();

        if (minutesSpentSinceStart < 5) return 10;
        if (minutesSpentSinceStart < 15) return 30;

        return 60;
    }
}

选项 2:反应堆路

这种方法非常接近@VonC建议,我根据要求修改了该方法以使其更合适。generate

public class ScheduledTask2 {

    private record FluxArgs(Instant startTime, Boolean calculateDelay) {}

    private static Flux<String> task() {
        return Flux.<Duration, FluxArgs>generate(
                // use start time as initial value and boolean marker to skip start delay
                () -> new FluxArgs(Instant.now(), false),
                (args, sink) -> {
                    if (args.calculateDelay) {
                        sink.next(Duration.ofSeconds(calculateDelaySec(args.startTime)));
                    } else {
                        sink.next(Duration.ZERO);
                    }
                    // pass start time as argument for next iteration and calculateDelay:true to calculate correct delay
                    return new FluxArgs(args.startTime, true);
                }
            )
            .delayUntil(Mono::delay)
            .concatMap(ignore -> doSomething()) // concatMap is used to make sure that doSomething is not executed in parallel
            .takeUntil(ScheduledTask2::shouldStop);
    }

    private static long calculateDelaySec(Instant startTime) {
        var minutesSpentSinceStart = Duration.between(startTime, Instant.now()).toMinutes();

        if (minutesSpentSinceStart < 5) return 10;
        if (minutesSpentSinceStart < 15) return 30;

        return 60;
    }

    private static Mono<String> doSomething() {
        return Mono.just("");
    }

    private static boolean shouldStop(String val) {
        return true; // put your custom logic here
    }
}

选项 3:混合。

如果您需要将任务执行的结果作为 Flux 来进一步传播它们,则可以使用此选项。对我来说,这个选项看起来没有选项 2 那么复杂,但允许您将结果用作 Flux 流。

请记住,根据您的需要,您可以使用不同的 API,例如 或 此处。Sinksmulticast()unicast()

public class ScheduledTask3 {

    private final Scheduler scheduler = Schedulers.newSingle("custom-scheduled-task");
    private final Instant startTime = Instant.now();
    private final Sinks.Many<String> sink = Sinks.many().replay().all();

    {
        scheduler.schedule(this::task); // if you don't want delay on start 
        //scheduler.schedule(this::task, 10, TimeUnit.SECONDS); // if you want delay on start
    }

    public Flux<String> taskResults() {
        return sink.asFlux();
    }

    private void task() {
        var result = doSomething();
        sink.tryEmitNext(result);

        if (shouldContinue()) {
            scheduler.schedule(this::task, calculateDelaySec(), TimeUnit.SECONDS);
        }
    }

    private String doSomething() {
        return "";
    }

    private boolean shouldContinue() {
        return true; // put your custom logic here
    }

    private long calculateDelaySec() {
        var minutesSpentSinceStart = Duration.between(startTime, Instant.now()).toMinutes();

        if (minutesSpentSinceStart < 5) return 10;
        if (minutesSpentSinceStart < 15) return 30;

        return 60;
    }
}

评论

0赞 Praveen Kumar 11/7/2023
感谢您提供此解决方案。与@VonC建议类似,很少有后续想法。如果执行时间大于延迟时间,会发生什么情况?这可能会并行触发多次。另外,如果我想以 0 秒的延迟开始,然后慢慢增加延迟,我该如何实现?doSomething()doSomething()
1赞 Yevhenii Semenov 11/7/2023
@PraveenKumar “选项 1” 和 “选项 3”: doSomething() 不会并行触发。若要删除启动时的初始延迟,可以删除初始值设定项块中方法中的延迟。至于“选项 2”,可以并行运行,要解决此问题,您需要将 替换为 .至于初始延迟 - 要删除它,您需要在方法中添加额外的标记。我已经更新了原始帖子中的所有选项,因此它们现在符合这些要求。如果您有任何疑问或需要帮助,请检查并告诉我。scheduledoSomething()flatMapconcatMapgenerate