使用 WebClient 时出现对外部服务器的意外自动多次调用和/或异步问题

Unexpected automatic multiple calls to external server and/or asynchronous issue using WebClient

提问人:cknelle 提问时间:11/16/2023 最后编辑:cknelle 更新时间:11/17/2023 访问量:29

问:

我正在开发一个基于 Webflux(Spring boot 3.1.5 和 java 17)的 Spring boot 应用程序。它位于前端应用程序和另一个外部应用程序(下面称为外部服务器)之间。前端调用我的控制器 .java,它调用我的服务,.java,它调用我的客户端API,.java,它调用外部服务器。 为了执行我的测试,我用 postman 替换了 frontEnd 应用程序。

我的问题如下: 在外部服务器端,我可以在其日志中看到我的springboot应用程序调用了4次而不是一次。 在我这边,日志表明(请参阅下面的日志):

  • 我只调用了一次外部服务
  • 我的客户端 API.java 的响应状态为成功。我发送到外部服务器的输入被正确地保存到外部服务器数据库中。服务器在他这边检查数据是否已经插入到其数据库中,在这种情况下,它拒绝请求,告诉数据已经注册。这是我再次运行请求时正确得到的。
  • 但是我的服务 .java 和控制器 .java 上的响应失败,有 400 错误代码错误请求

为了检查我是否调用了 4 次外部应用程序,我使用 postman 来直接调用外部服务器服务。它被正确地称为邮递员一次。

恐怕我没有以正确的方式实现 Webflux,因为 webflux 是异步的。我有一种感觉,我试图将同步治疗与异步行为混合在一起 查看日志时。似乎在 Service.java 和 Controller.java 尝试发送响应后,我从我的 ClientAPI.java 获得了响应(请参阅日志中的时间戳)。

我的控制器.java :

@Slf4j
@RestController
@RequestMapping("/v1")
@CrossOrigin(origins = "http://localhost:4200")
public class Controller {
    private static final String PATH_SERVICE = "/pathservice";

    @Autowired
    Service service;

    @PostMapping(path = PATH_SERVICE, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<TheResponseClass> mySpringbootService(@RequestHeader("requestId") String requestId,
            @RequestBody @Valid InputData inputData) {
        Mono<TheResponseClass> response = service.save(requestId, inputData);

        response.subscribe(
                status -> log
                        .debug("Controller :  response status = " + status.getResponse().getStatus()),
                error -> log.error(
                        "Controller : The following error happened!"
                                + error));

        return response;

    }
    
}

我的服务.java :

@Slf4j
@Service
public class Service {

    @Autowired
    ClientAPI clientAPI;

    @Autowired
    BuilRequestBody builRequestBody;

    @Autowired
    BuildHeaderRequest buildHeaderRequest;

    @Autowired
    Config config;

    public Mono<TheResponseClass> save(String requestId, @Valid InputData inputData) {

        Mono<TheResponseClass> response = null;

        if (inputData.getClassInfo() != null) {

            response = clientAPI.sendRequest(
                    builRequestBody.buildRequest(inputData),
                    buildHeaderRequest.computeHeader(requestId, config.getHeaderID()));

            response.subscribe(responseResult -> {
                Status resultStatus = responseResult.getResponse().getStatus();
                log.debug("Service : response status = " + resultStatus);

                if (resultStatus == Status.SUCCESS){
                        //Call another service

                }

            }, error -> {
                log.error("Service : error happened !"+ error);
            });
        } else {
            TheResponseClass errorResponse = new TheResponseClass();
            errorResponse.setName("name");
            errorResponse.setOperation("operation");
            response = Mono.just(errorResponse);
        }
        return response;

    }

}

我的客户端API.java调用外部服务器:

@Slf4j
@Service
public class ClientAPI {
    private WebClient webClient;
    private String serverUrl;

    // Retrieve CustomWebClient built with htts and token management
    @Autowired
    public ClientAPI(@Qualifier("CustomWebClient") WebClient webClient) {
        this.serverUrl = "https://localhost:8456/myServer";
        this.webClient = webClient;
    }

    public Mono<TheResponseClass> sendRequest(Request bodyRequest, HeaderParameters headerParameters) {
        
    //log.debug("ClientAPI Header : REQUEST_ID = "+headerParameters.getRequestID().toString());
   // other log.debug for multiple headers data...
    
        Mono<TheResponseClass> result = this.webClient.post().uri(this.serverUrl)
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .header(HeaderParameters.REQUEST_ID, headerParameters.getRequestID())
                .body(Mono.just(bodyRequest), Request.class)
                .retrieve()
                .bodyToMono(TheResponseClass.class);

        result.subscribe(responseResult -> {
            ObjectMapper mapper = new ObjectMapper();
            try {
                String jsonOutput = mapper.writeValueAsString(responseResult);
                log.info("ClientAPI : response result = "+jsonOutput);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }

        }, error -> {
            log.error("ClientAPI : The following error happened on response getStatus!", error);
        });
        return result;
    }
}

我的一些简化数据模型:

来自前端的输入数据:

public class InputData {
    private String Data;
    private ClassInfo ClassInfo;
    
    public void setData(String Data) {
        this.Data = Data;
    }

    public void setClassInfo(ClassInfo ClassInfo) {
        this.ClassInfo = ClassInfo;
    }

    public String getData() {
        return Data;
    }

    public ClassInfo getClassInfo() {
        return ClassInfo;
    }
}

向外部服务器请求数据:

public class Request {

  private String name;
  private String operation;

   public Request() {
  }

  public Request name(String name) {
    
    this.name = name;
    return this;
  }
  
  public String getName() {
    return name;
  }

  public void setName(String string) {
    this.name = string;
  }


  public Request operation(String operation) {
    
    this.operation = operation;
    return this;
  }

  public String getOperation() {
    return operation;
  }

  public void setOperation(String operation) {
    this.operation = operation;
  }

}

我的日志 :

12:05:38.266 ERROR i.z.b.t.l.b.s.Service - Service : error happened !org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST https://externalServerIP:8456/myServer
12:05:38.401 ERROR i.z.b.t.l.w.controller.Controller - Controller :  The following error happened on getStatus response!org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST https://externalServerIP:8486/myServer
12:05:38.676 ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [506b1d63-1]  500 Server Error for HTTP POST "/v1/biometricprofileEnrol"
org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST https://externalServerIP:8456/myServer
    at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:307)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ 400 BAD_REQUEST from POST https://externalServerIP:8456/myServer [DefaultWebClient]
    *__checkpoint ⇢ Handler package.webservice.controller.Controller#mySpringbootService(String, InputData) [DispatcherHandler]
    *__checkpoint ⇢ org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
    *__checkpoint ⇢ HTTP POST "myServer" [ExceptionHandlingWebHandler]
Original Stack Trace:
        at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:307)
        at org.springframework.web.reactive.function.client.DefaultClientResponse.lambda$createException$1(DefaultClientResponse.java:214)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
        at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162)
        at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
        at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2071)
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
        at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)
        at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:431)
        at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:485)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:712)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1466)
        at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1329)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1378)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)
12:05:39.041 INFO  p.ClientAPI - ClientAPI : response result = {"name":"yourname","operation":"yourOperation","response":{"status":"SUCCESS","error_Code":null,"system_message":null}}

我还尝试使用 block() 将我的代码设置为同步模式,如下所示: 在我的客户端API.java中:

...
        Mono<TheResponseClass> result = this.webClient.post().uri(this.serverUrl)
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .header(HeaderParameters.REQUEST_ID, headerParameters.getRequestID())
                .body(Mono.just(bodyRequest), Request.class)
                .retrieve()
                .bodyToMono(TheResponseClass.class).block();
...             

在我的服务.java中:

public TheResponseClass save(String requestId, @Valid InputData inputData) {
....
TheResponseClass response = null;
   return response;
}

在我的控制器.java中:

...
    @PostMapping(path = PATH_SERVICE, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<TheResponseClass> mySpringbootService(@RequestHeader("requestId") String requestId,
            @RequestBody @Valid InputData inputData) {
        TheResponseClass response = service.save(requestId, inputData);
        ....
        return Mono.just(response);
        }

但是我得到了以下痕迹:

13:34:12.673 ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [5c5ac707-1]  500 Server Error for HTTP POST "/myServer"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
    *__checkpoint ⇢ HTTP POST "/v1/biometricprofileEnrol" [ExceptionHandlingWebHandler]
Original Stack Trace:
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
        at reactor.core.publisher.Mono.block(Mono.java:1712)
        at idemia.zz.bc.touchpoint.lys2022.biometricenrolclient.service.BiometricEnrolClientAPI.sendRequest(BiometricEnrolClientAPI.java:54)
        at idemia.zz.bc.touchpoint.lys2022.biometricenrolclient.service.BiometricEnrolService.saveBiometricProfile(BiometricEnrolService.java:54)
        at idemia.zz.bc.touchpoint.lys2022.webservice.controller.Controller.biometricEnrol(Controller.java:103)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:145)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)
        at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:293)
        at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:474)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
        at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
        at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2071)
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
        at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)
        at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:431)
        at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:682)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:284)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)

为了解决上面的 block() 问题,我尝试添加 share().block() 如下,但在这种情况下,我的应用程序在运行时卡住了。它真的被封锁了。 在我的客户端API.java中:

...
        Mono<TheResponseClass> result = this.webClient.post().uri(this.serverUrl)
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .header(HeaderParameters.REQUEST_ID, headerParameters.getRequestID())
                .body(Mono.just(bodyRequest), Request.class)
                .retrieve()
                .bodyToMono(TheResponseClass.class).share().block();
...

我还尝试以异步方式在WebClient上将重试次数设置为零。但它不会改变任何总是调用外部服务器 4 次的内容。 如下:

...
        Mono<TheResponseClass> result = this.webClient.post().uri(this.serverUrl)
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .header(HeaderParameters.REQUEST_ID, headerParameters.getRequestID())
                .body(Mono.just(bodyRequest), Request.class)
                .retrieve()
                .bodyToMono(TheResponseClass.class).retry(0);
...
java spring-webflux

评论

0赞 Toerktumlare 11/17/2023
你的问题是你误解了什么。所有后端服务器都是,它们向消费者提供数据,那么消费者呢?嗯,这是数据的最终消费者,在你的情况下,它是网页的前端。从前端到后端。那么问题来了,你为什么要订阅?它是前端,它订阅 so ,并从您的控制器返回 和 。subscribeproducerssubscribesremove all subscribe statementsMonoFlux
0赞 cknelle 11/17/2023
谢谢。事实上,我删除了所有 subsribe 语句,我只有一个电话。但是这个解决方案不符合我的要求。因为我同时也是另一个外部服务器的使用者。我可以用什么替换 Service.java 中的订阅?我真的需要从我的 WebClient 中检索结果,以执行对其他外部服务的其他调用。我尝试了flatMap()而不是subscribe(),但得到了相同的意外行为,即几个调用。我也尝试了doOnSuccess(),但没有任何反应,我不能在我的情况下使用doOnSuccess().block。
0赞 Toerktumlare 11/17/2023
我不能教你 webflux 的基础知识,只有最终消费者应该订阅,当你说“我试过这个,我试过那个”时,我帮不了你,因为我只能为我看到的代码提供帮助,而你提供的代码就是你问题中的代码。在您的代码中,您错误地使用了订阅,这就是我所能提供的帮助。Stack overflow 不是一个论坛,而是一个问答网站。您可能是其他服务的消费者,但您不是最终消费者。没有看到你的代码,我不可能帮助你。
0赞 cknelle 11/21/2023
对不起,误会确实,你的回应挽救了我的一天。更好地理解订阅。更好地理解 flatMap,因为我设法使用它符合我的要求。因此,我澄清了我的请求,并写了一份带有额外输入的答案,我也得到了这些输入。

答:

0赞 cknelle 11/21/2023 #1

感谢 Toerktumlare 和其他输入,我删除了将调用复制到外部服务的订阅调用。 因为我已经有一个隐式订阅,我的控制器调用外部 API。 我改用 flatMap 来操作接收到的数据。 我使用 subscribe 来调用 DoneService,因为我需要直接在我的方法之一中调用它(而不是通过控制器直接从 FrontEnd 发出的请求)

因此,对于控制器.java,例如:

@Slf4j
@RestController
@RequestMapping("/v1")
@CrossOrigin(origins = "http://localhost:4200")
public class Controller {
    private static final String PATH_SERVICE = "/pathservice";

    @Autowired
    Service service;

    @PostMapping(path = PATH_SERVICE, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<TheResponseClass> mySpringbootService(@RequestHeader("requestId") String requestId,
            @RequestBody @Valid InputData inputData) {
        return service.save(requestId, inputData);

    }
    
}

并且从Service.java调用我的DoneService:

@Slf4j
@Service
public class Service {

    @Autowired
    ClientAPI clientAPI;

    @Autowired
    BuilRequestBody builRequestBody;

    @Autowired
    BuildHeaderRequest buildHeaderRequest;

    @Autowired
    DoneService doneService;


    public Mono<TheResponseClass> save(String requestId, @Valid InputData inputData) {
        Mono<TheResponseClass> response = null;
            // builRequestBody and buildHeaderRequest are used to convert the inputData into the 
            // dedicated format for the external server API 
            return clientAPI.sendRequest(
                    builRequestBody.buildRequest(inputData),
                    buildHeaderRequest.computeHeader(requestId, config.getHeaderID())).flatMap(responseResult -> {
                    Status resultStatus = responseResult.getResponse().getStatus();
                    if (resultStatus == Status.SUCCESS) {
                        ....
                        //Call another service that uses WebClient implementation
                        doneService.notify().subscribe();

                    }
                    return Mono.just(responseResult);
                });
    
    }
}