提问人:Praveen Kumar 提问时间:11/4/2023 更新时间:11/9/2023 访问量:274
Java Flux 中的延迟增加
Increasing delays in Java Flux
问:
我有一个代码,我可以在循环中执行一段逻辑,中间使用 Flux 有一些延迟。像这样的东西,
Flux.defer(() -> service.doSomething())
.repeatWhen(v -> Flux.interval(Duration.ofSeconds(10)))
.map(data -> mapper(data)) //map data
.takeUntil(v -> shouldContinue(v)) //checks if the loop can be terminated
.onErrorStop();
现在,我想要增量延迟。这意味着,在前 5 分钟内,每次执行之间的延迟可能为 10 秒。然后,在接下来的 10 分钟内,延迟可以是 30 秒。此后,每次执行之间的延迟可能为一分钟。
如何使用 Flux 实现这一点?
提前致谢。
答:
您可以尝试使用 Flux.generate
创建一个自定义序列,该序列以所需的延迟发出项目。
另请参阅“Flux.create
和 Flux.generate
之间的区别”。
Flux<Object> flux = Flux.generate(
() -> 0L,
(state, sink) -> {
sink.next(state);
return state + 1;
}
)
.delayUntil(state -> {
long delay = 10; // default delay
if (state < 30) { // first 5 minutes with 10 seconds delay
delay = 10;
} else if (state < 50) { // next 10 minutes with 30 seconds delay
delay = 30;
} else { // thereafter, 1 minute delay
delay = 60;
}
return Mono.delay(Duration.ofSeconds(delay));
})
.flatMap(state -> service.doSomething())
.map(data -> mapper(data))
.takeUntil(v -> shouldContinue(v))
.onErrorStop();
这将初始化状态,并在每次迭代时递增。
根据状态值(表示已用期间数),它会应用不同的延迟。0
┌────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Flux │ │ delayUntil with │ │ doSomething │
│ generate ├───────► │ custom delays ├────────► │ Service │
└────────────┘ └─────────────────┘ └──────────────┘
│ │
│ │
▼ ▼
┌────────────┐ ┌────────────────┐ ┌───────────────┐
│ flatMap │ │ takeUntil │ │ onErrorStop │
│ function ├───────►│ shouldContinue ├─────────► │ method │
└────────────┘ └────────────────┘ └───────────────┘
请注意,在实际应用中,您可能需要考虑 的执行时间。如果需要大量时间,您可能需要相应地调整延迟。service.doSomething()
如果执行时间大于延迟时间,会发生什么情况?这可能会并行触发多次。
doSomething()
doSomething()
当执行时间长于延迟时,后续执行不会等待当前执行完成,这可能会导致并发执行。doSomething()
为了防止这种情况,您可以使用确保按顺序执行的组合,以及有状态映射,以根据上次执行的开始或完成时间计算延迟。
参见 “Project Reactor 中的 flatMap、flatMapSequential
和 concatMap
有什么区别? .concatMap()
doSomething()
"
另外,如果我想以 0 秒的延迟开始,然后慢慢增加延迟,我该如何实现?
然后,您可以实现您的功能,延迟为 0 秒,然后慢慢增加延迟:
AtomicLong lastExecutionStart = new AtomicLong(System.currentTimeMillis());
Flux<Object> flux = Flux.generate(
() -> 0L,
(state, sink) -> {
sink.next(state);
return state + 1;
}
)
.concatMap(state -> {
long delay;
long currentTime = System.currentTimeMillis();
long elapsedTime = currentTime - lastExecutionStart.get();
long executionTime = 0; // replace with actual execution time if known
// Calculate the delay based on elapsed time since the start of the last execution
if (elapsedTime <= 5 * 60 * 1000) { // first 5 minutes
delay = Math.max(0, (10 - executionTime) * 1000); // 10 seconds delay, adjust for execution time
} else if (elapsedTime <= 15 * 60 * 1000) { // next 10 minutes
delay = Math.max(0, (30 - executionTime) * 1000); // 30 seconds delay, adjust for execution time
} else { // thereafter
delay = Math.max(0, (60 - executionTime) * 1000); // 60 seconds delay, adjust for execution time
}
return Mono.delay(Duration.ofMillis(delay))
.then(Mono.fromCallable(() -> {
lastExecutionStart.set(System.currentTimeMillis());
return "doSomething";
}))
.flatMap(o -> service.doSomething());
})
.map(data -> mapper(data))
.takeUntil(v -> shouldContinue(v))
.onErrorStop();
concatMap
确保按顺序进行呼叫。该变量跟踪上次执行的开始时间。该变量用于计算下一次执行的延迟。doSomething()
lastExecutionStart
Mono.delay
() 与 then()
一起使用,以在开始下一次执行之前等待计算出的延迟。
延迟后,被调用并更新时间戳。doSomething()
lastExecutionStart
这种模式可确保:
doSomething()
不是并发执行的。- 下次执行前的延迟考虑了实际执行时间。
doSomething()
- 初始延迟可以是 0,然后根据您提供的逻辑递增。
┌──────────────────────────────────────────────┐
│ Flux │
└──────────────────────────────────────────────┘
│
┌─────────────┴─────────────┐
│ Generate (state) │
└─────────────┬─────────────┘
│
┌─────────────▼─────────────┐
│ Calculate Next Delay │ <────┐
└─────────────┬─────────────┘ │
│ │
┌────────────────▼─────────────────┐ │
│ Mono.delay │ │
└────────────────┬─────────────────┘ │
│ │
┌────────────────▼──────────────────┐ │
│ Set lastExecutionStart Timestamp │ │
└────────────────┬──────────────────┘ │
│ │
┌─────────────▼─────────────┐ │
│ service.doSomething │ │
└─────────────┬─────────────┘ │
│ │
┌─────────────▼─────────────┐ │
│ map │ │
└─────────────┬─────────────┘ │
│ │
┌─────────────▼─────────────┐ │
│ takeUntil │ │
└─────────────┬─────────────┘ │
│ │
┌─────────────▼─────────────┐ │
│ onErrorStop │ │
└─────────────┬─────────────┘ │
│ │
└────────────────────┘
我注意到如果返回一个,我得到一个错误。
但如果被退回,它就可以正常工作。所以我不得不用一个代替.dataProvider.fetchData()
Flux
Cannot infer type argument(s) for <V> concatMap(Function<? super T,? extends Publisher<? extends V>>)
Mono
flatMapMany
flatMap
但是有没有办法处理是否可以返回 a 或 a ?
service.doSomething()
Flux
Mono
我正在考虑将其编写为通用服务。因此,想看看这是否可以处理 类型的数据,甚至是 类型的数据。
Flux<T>
Mono<T>
T
若要处理可以返回 、 或 的服务方法,需要在响应式管道中调整方法调用,以迎合这些不同的响应式类型。一种方法是使用 indeed, 来处理两者 和 因为可以转换为 ,但反之则不然。Flux<T>
Mono<T>
T
flatMapMany
Mono
Flux
Mono
Flux
AtomicLong lastExecutionStart = new AtomicLong(System.currentTimeMillis());
Flux<Object> flux = Flux.generate(
() -> 0L,
(state, sink) -> {
sink.next(state);
return state + 1;
}
)
.concatMap(state -> {
long delay;
long currentTime = System.currentTimeMillis();
long elapsedTime = currentTime - lastExecutionStart.get();
long executionTime = 0; // replace with actual execution time if known
// Calculate the delay based on elapsed time since the start of the last execution
if (elapsedTime <= 5 * 60 * 1000) { // first 5 minutes
delay = Math.max(0, (10 - executionTime) * 1000); // 10 seconds delay, adjust for execution time
} else if (elapsedTime <= 15 * 60 * 1000) { // next 10 minutes
delay = Math.max(0, (30 - executionTime) * 1000); // 30 seconds delay, adjust for execution time
} else { // thereafter
delay = Math.max(0, (60 - executionTime) * 1000); // 60 seconds delay, adjust for execution time
}
return Mono.delay(Duration.ofMillis(delay))
.then(Mono.fromCallable(() -> {
lastExecutionStart.set(System.currentTimeMillis());
return "doSomething";
}))
// Use flatMapMany to handle both Mono and Flux from doSomething()
.flatMapMany(o -> Flux.from(service.doSomething()));
})
.map(data -> mapper(data))
.takeUntil(v -> shouldContinue(v))
.onErrorStop();
通过使用 ,可以确保服务调用可以返回 a 或 be a,这将转换为 .为了处理同步类型,您通常会将其包装起来,具体取决于您是想要单个结果还是结果流。flatMapMany
Flux
Mono
Flux
T
Mono.just()
Flux.just()
要真正使您的服务泛型,您必须确保方法签名被适当地设计为返回一个 ,这两者都是 和 实现的。这将允许您在服务方法本身中抽象出这些类型之间的差异。doSomething()
Publisher<T>
Mono
Flux
为了处理同步值 (),您需要在 service 方法中创建一个反应式包装器,如下所示:T
public Publisher<T> doSomething() {
T result = ...; // Your synchronous logic here
return Mono.just(result); // or Flux.just(result) if it makes sense to return a stream
}
这样,就可以在反应式管道中使用相同的方法来处理 中的所有类型的返回值。flatMapMany
doSomething()
评论
doSomething()
doSomething()
dataProvider.fetchData()
Cannot infer type argument(s) for <V> concatMap(Function<? super T,? extends Publisher<? extends V>>)
flatMapMany
flatMap
service.doSomething()
虽然 Project Reactor 是一个很棒的工具,但重要的是要记住,并非所有任务都需要通过 Project Reactor 解决。在我看来,有时使用纯 Java 可能不那么复杂,更适合,即使 Project Reactor 是你的应用程序堆栈中的主要工具。但由于我不知道你的真实情况,我将分享两个版本。
选项 1:更少的 Reactor,更纯的 Java。
public class ScheduledTask {
private final Scheduler scheduler = Schedulers.newSingle("custom-scheduled-task");
private final Instant startTime = Instant.now();
{
scheduler.schedule(this::task); // if you don't want delay on start
//scheduler.schedule(this::task, 10, TimeUnit.SECONDS); // if you want delay on start
}
private void task() {
System.out.println("do something"); // you can inject your service to the class and execute needed method here
if (shouldContinue()) {
scheduler.schedule(this::task, calculateDelaySec(), TimeUnit.SECONDS);
}
}
private boolean shouldContinue() {
return true; // put your custom logic here
}
private long calculateDelaySec() {
var minutesSpentSinceStart = Duration.between(startTime, Instant.now()).toMinutes();
if (minutesSpentSinceStart < 5) return 10;
if (minutesSpentSinceStart < 15) return 30;
return 60;
}
}
选项 2:反应堆路
这种方法非常接近@VonC建议,我根据要求修改了该方法以使其更合适。generate
public class ScheduledTask2 {
private record FluxArgs(Instant startTime, Boolean calculateDelay) {}
private static Flux<String> task() {
return Flux.<Duration, FluxArgs>generate(
// use start time as initial value and boolean marker to skip start delay
() -> new FluxArgs(Instant.now(), false),
(args, sink) -> {
if (args.calculateDelay) {
sink.next(Duration.ofSeconds(calculateDelaySec(args.startTime)));
} else {
sink.next(Duration.ZERO);
}
// pass start time as argument for next iteration and calculateDelay:true to calculate correct delay
return new FluxArgs(args.startTime, true);
}
)
.delayUntil(Mono::delay)
.concatMap(ignore -> doSomething()) // concatMap is used to make sure that doSomething is not executed in parallel
.takeUntil(ScheduledTask2::shouldStop);
}
private static long calculateDelaySec(Instant startTime) {
var minutesSpentSinceStart = Duration.between(startTime, Instant.now()).toMinutes();
if (minutesSpentSinceStart < 5) return 10;
if (minutesSpentSinceStart < 15) return 30;
return 60;
}
private static Mono<String> doSomething() {
return Mono.just("");
}
private static boolean shouldStop(String val) {
return true; // put your custom logic here
}
}
选项 3:混合。
如果您需要将任务执行的结果作为 Flux 来进一步传播它们,则可以使用此选项。对我来说,这个选项看起来没有选项 2 那么复杂,但允许您将结果用作 Flux 流。
请记住,根据您的需要,您可以使用不同的 API,例如 或 此处。Sinks
multicast()
unicast()
public class ScheduledTask3 {
private final Scheduler scheduler = Schedulers.newSingle("custom-scheduled-task");
private final Instant startTime = Instant.now();
private final Sinks.Many<String> sink = Sinks.many().replay().all();
{
scheduler.schedule(this::task); // if you don't want delay on start
//scheduler.schedule(this::task, 10, TimeUnit.SECONDS); // if you want delay on start
}
public Flux<String> taskResults() {
return sink.asFlux();
}
private void task() {
var result = doSomething();
sink.tryEmitNext(result);
if (shouldContinue()) {
scheduler.schedule(this::task, calculateDelaySec(), TimeUnit.SECONDS);
}
}
private String doSomething() {
return "";
}
private boolean shouldContinue() {
return true; // put your custom logic here
}
private long calculateDelaySec() {
var minutesSpentSinceStart = Duration.between(startTime, Instant.now()).toMinutes();
if (minutesSpentSinceStart < 5) return 10;
if (minutesSpentSinceStart < 15) return 30;
return 60;
}
}
评论
doSomething()
doSomething()
schedule
doSomething()
flatMap
concatMap
generate
评论
Meaning, for the first 5 minutes, the delay can be 10 seconds between each execution
.如果我理解正确的话,前 5 分钟内有多次执行,它们都延迟了 10 秒。是吗?