提问人:cknelle 提问时间:11/16/2023 最后编辑:cknelle 更新时间:11/17/2023 访问量:29
使用 WebClient 时出现对外部服务器的意外自动多次调用和/或异步问题
Unexpected automatic multiple calls to external server and/or asynchronous issue using WebClient
问:
我正在开发一个基于 Webflux(Spring boot 3.1.5 和 java 17)的 Spring boot 应用程序。它位于前端应用程序和另一个外部应用程序(下面称为外部服务器)之间。前端调用我的控制器 .java,它调用我的服务,.java,它调用我的客户端API,.java,它调用外部服务器。 为了执行我的测试,我用 postman 替换了 frontEnd 应用程序。
我的问题如下: 在外部服务器端,我可以在其日志中看到我的springboot应用程序调用了4次而不是一次。 在我这边,日志表明(请参阅下面的日志):
- 我只调用了一次外部服务
- 我的客户端 API.java 的响应状态为成功。我发送到外部服务器的输入被正确地保存到外部服务器数据库中。服务器在他这边检查数据是否已经插入到其数据库中,在这种情况下,它拒绝请求,告诉数据已经注册。这是我再次运行请求时正确得到的。
- 但是我的服务 .java 和控制器 .java 上的响应失败,有 400 错误代码错误请求
为了检查我是否调用了 4 次外部应用程序,我使用 postman 来直接调用外部服务器服务。它被正确地称为邮递员一次。
恐怕我没有以正确的方式实现 Webflux,因为 webflux 是异步的。我有一种感觉,我试图将同步治疗与异步行为混合在一起 查看日志时。似乎在 Service.java 和 Controller.java 尝试发送响应后,我从我的 ClientAPI.java 获得了响应(请参阅日志中的时间戳)。
我的控制器.java :
@Slf4j
@RestController
@RequestMapping("/v1")
@CrossOrigin(origins = "http://localhost:4200")
public class Controller {
private static final String PATH_SERVICE = "/pathservice";
@Autowired
Service service;
@PostMapping(path = PATH_SERVICE, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<TheResponseClass> mySpringbootService(@RequestHeader("requestId") String requestId,
@RequestBody @Valid InputData inputData) {
Mono<TheResponseClass> response = service.save(requestId, inputData);
response.subscribe(
status -> log
.debug("Controller : response status = " + status.getResponse().getStatus()),
error -> log.error(
"Controller : The following error happened!"
+ error));
return response;
}
}
我的服务.java :
@Slf4j
@Service
public class Service {
@Autowired
ClientAPI clientAPI;
@Autowired
BuilRequestBody builRequestBody;
@Autowired
BuildHeaderRequest buildHeaderRequest;
@Autowired
Config config;
public Mono<TheResponseClass> save(String requestId, @Valid InputData inputData) {
Mono<TheResponseClass> response = null;
if (inputData.getClassInfo() != null) {
response = clientAPI.sendRequest(
builRequestBody.buildRequest(inputData),
buildHeaderRequest.computeHeader(requestId, config.getHeaderID()));
response.subscribe(responseResult -> {
Status resultStatus = responseResult.getResponse().getStatus();
log.debug("Service : response status = " + resultStatus);
if (resultStatus == Status.SUCCESS){
//Call another service
}
}, error -> {
log.error("Service : error happened !"+ error);
});
} else {
TheResponseClass errorResponse = new TheResponseClass();
errorResponse.setName("name");
errorResponse.setOperation("operation");
response = Mono.just(errorResponse);
}
return response;
}
}
我的客户端API.java调用外部服务器:
@Slf4j
@Service
public class ClientAPI {
private WebClient webClient;
private String serverUrl;
// Retrieve CustomWebClient built with htts and token management
@Autowired
public ClientAPI(@Qualifier("CustomWebClient") WebClient webClient) {
this.serverUrl = "https://localhost:8456/myServer";
this.webClient = webClient;
}
public Mono<TheResponseClass> sendRequest(Request bodyRequest, HeaderParameters headerParameters) {
//log.debug("ClientAPI Header : REQUEST_ID = "+headerParameters.getRequestID().toString());
// other log.debug for multiple headers data...
Mono<TheResponseClass> result = this.webClient.post().uri(this.serverUrl)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HeaderParameters.REQUEST_ID, headerParameters.getRequestID())
.body(Mono.just(bodyRequest), Request.class)
.retrieve()
.bodyToMono(TheResponseClass.class);
result.subscribe(responseResult -> {
ObjectMapper mapper = new ObjectMapper();
try {
String jsonOutput = mapper.writeValueAsString(responseResult);
log.info("ClientAPI : response result = "+jsonOutput);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}, error -> {
log.error("ClientAPI : The following error happened on response getStatus!", error);
});
return result;
}
}
我的一些简化数据模型:
来自前端的输入数据:
public class InputData {
private String Data;
private ClassInfo ClassInfo;
public void setData(String Data) {
this.Data = Data;
}
public void setClassInfo(ClassInfo ClassInfo) {
this.ClassInfo = ClassInfo;
}
public String getData() {
return Data;
}
public ClassInfo getClassInfo() {
return ClassInfo;
}
}
向外部服务器请求数据:
public class Request {
private String name;
private String operation;
public Request() {
}
public Request name(String name) {
this.name = name;
return this;
}
public String getName() {
return name;
}
public void setName(String string) {
this.name = string;
}
public Request operation(String operation) {
this.operation = operation;
return this;
}
public String getOperation() {
return operation;
}
public void setOperation(String operation) {
this.operation = operation;
}
}
我的日志 :
12:05:38.266 ERROR i.z.b.t.l.b.s.Service - Service : error happened !org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST https://externalServerIP:8456/myServer
12:05:38.401 ERROR i.z.b.t.l.w.controller.Controller - Controller : The following error happened on getStatus response!org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST https://externalServerIP:8486/myServer
12:05:38.676 ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [506b1d63-1] 500 Server Error for HTTP POST "/v1/biometricprofileEnrol"
org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST https://externalServerIP:8456/myServer
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:307)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ 400 BAD_REQUEST from POST https://externalServerIP:8456/myServer [DefaultWebClient]
*__checkpoint ⇢ Handler package.webservice.controller.Controller#mySpringbootService(String, InputData) [DispatcherHandler]
*__checkpoint ⇢ org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP POST "myServer" [ExceptionHandlingWebHandler]
Original Stack Trace:
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:307)
at org.springframework.web.reactive.function.client.DefaultClientResponse.lambda$createException$1(DefaultClientResponse.java:214)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2071)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:431)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:485)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:712)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1466)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1329)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1378)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
12:05:39.041 INFO p.ClientAPI - ClientAPI : response result = {"name":"yourname","operation":"yourOperation","response":{"status":"SUCCESS","error_Code":null,"system_message":null}}
我还尝试使用 block() 将我的代码设置为同步模式,如下所示: 在我的客户端API.java中:
...
Mono<TheResponseClass> result = this.webClient.post().uri(this.serverUrl)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HeaderParameters.REQUEST_ID, headerParameters.getRequestID())
.body(Mono.just(bodyRequest), Request.class)
.retrieve()
.bodyToMono(TheResponseClass.class).block();
...
在我的服务.java中:
public TheResponseClass save(String requestId, @Valid InputData inputData) {
....
TheResponseClass response = null;
return response;
}
在我的控制器.java中:
...
@PostMapping(path = PATH_SERVICE, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<TheResponseClass> mySpringbootService(@RequestHeader("requestId") String requestId,
@RequestBody @Valid InputData inputData) {
TheResponseClass response = service.save(requestId, inputData);
....
return Mono.just(response);
}
但是我得到了以下痕迹:
13:34:12.673 ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [5c5ac707-1] 500 Server Error for HTTP POST "/myServer"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP POST "/v1/biometricprofileEnrol" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
at reactor.core.publisher.Mono.block(Mono.java:1712)
at idemia.zz.bc.touchpoint.lys2022.biometricenrolclient.service.BiometricEnrolClientAPI.sendRequest(BiometricEnrolClientAPI.java:54)
at idemia.zz.bc.touchpoint.lys2022.biometricenrolclient.service.BiometricEnrolService.saveBiometricProfile(BiometricEnrolService.java:54)
at idemia.zz.bc.touchpoint.lys2022.webservice.controller.Controller.biometricEnrol(Controller.java:103)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:145)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:293)
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:474)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2071)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:431)
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:682)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
为了解决上面的 block() 问题,我尝试添加 share().block() 如下,但在这种情况下,我的应用程序在运行时卡住了。它真的被封锁了。 在我的客户端API.java中:
...
Mono<TheResponseClass> result = this.webClient.post().uri(this.serverUrl)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HeaderParameters.REQUEST_ID, headerParameters.getRequestID())
.body(Mono.just(bodyRequest), Request.class)
.retrieve()
.bodyToMono(TheResponseClass.class).share().block();
...
我还尝试以异步方式在WebClient上将重试次数设置为零。但它不会改变任何总是调用外部服务器 4 次的内容。 如下:
...
Mono<TheResponseClass> result = this.webClient.post().uri(this.serverUrl)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HeaderParameters.REQUEST_ID, headerParameters.getRequestID())
.body(Mono.just(bodyRequest), Request.class)
.retrieve()
.bodyToMono(TheResponseClass.class).retry(0);
...
答:
感谢 Toerktumlare 和其他输入,我删除了将调用复制到外部服务的订阅调用。 因为我已经有一个隐式订阅,我的控制器调用外部 API。 我改用 flatMap 来操作接收到的数据。 我使用 subscribe 来调用 DoneService,因为我需要直接在我的方法之一中调用它(而不是通过控制器直接从 FrontEnd 发出的请求)
因此,对于控制器.java,例如:
@Slf4j
@RestController
@RequestMapping("/v1")
@CrossOrigin(origins = "http://localhost:4200")
public class Controller {
private static final String PATH_SERVICE = "/pathservice";
@Autowired
Service service;
@PostMapping(path = PATH_SERVICE, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<TheResponseClass> mySpringbootService(@RequestHeader("requestId") String requestId,
@RequestBody @Valid InputData inputData) {
return service.save(requestId, inputData);
}
}
并且从Service.java调用我的DoneService:
@Slf4j
@Service
public class Service {
@Autowired
ClientAPI clientAPI;
@Autowired
BuilRequestBody builRequestBody;
@Autowired
BuildHeaderRequest buildHeaderRequest;
@Autowired
DoneService doneService;
public Mono<TheResponseClass> save(String requestId, @Valid InputData inputData) {
Mono<TheResponseClass> response = null;
// builRequestBody and buildHeaderRequest are used to convert the inputData into the
// dedicated format for the external server API
return clientAPI.sendRequest(
builRequestBody.buildRequest(inputData),
buildHeaderRequest.computeHeader(requestId, config.getHeaderID())).flatMap(responseResult -> {
Status resultStatus = responseResult.getResponse().getStatus();
if (resultStatus == Status.SUCCESS) {
....
//Call another service that uses WebClient implementation
doneService.notify().subscribe();
}
return Mono.just(responseResult);
});
}
}
评论
subscribe
producers
subscribes
remove all subscribe statements
Mono
Flux