提问人:ste 提问时间:11/2/2023 最后编辑:ste 更新时间:11/2/2023 访问量:52
spring-Integration - http 入站网关响应和异步处理 - 任务执行器线程只处理一条消息而不是所有消息
spring-Integration - http inbound-gateway response and asynchronous processing - task executor threads just processing one message instead of all
问:
描述:
我们有一个 Spring Integration 流程,我们希望在其中配置一个端点、拆分有效负载、返回给消费者,即使用 http 代码 202 和其他项目必须继续异步处理。
配置说明:
我们有一个.我们拆分传入的有效负载。创建始终是一个列表,列表的第一条消息是,然后是以下消息。<int-http:inbound-gateway>
<int:splitter>
splitter
MAIN
ITEM
在 with 输入通道中:消息将应答接收方线程,以便 API 的使用者获得 202。消息需要异步处理,即使使用者已经收到了 202。<int:header-value-router
mainItemRouter-channel
MAIN
ITEM
问题:
如果我有 即拆分器之后的 25 条(1 条和 24 条)消息的列表,线程只处理 10 条。这意味着每个线程只处理一条消息,然后停止。MAIN
ITEM
taskExecutor
为什么线程不拾取其他 14 条消息进行处理?taskExecutor
所有 24 条消息是否都到达了执行器通道?ITEM
这是配置:
<beans>
<int:channel id="postInboundReply-channel"/>
<int:header-enricher input-channel="postFinish-channel" output-channel="postInboundReply-channel">
<int:header name="myHeader" overwrite="true"
expression="setMyHeader"/>
</int:header-enricher>
<!-- Endpoint -->
<int-http:inbound-gateway id="postInbound-gateway"
request-channel="postInboundRequest-channel"
reply-channel="postInboundReply-channel"
supported-methods="POST"
path="/api/tests"
request-payload-type="java.lang.String"
mapped-request-headers="content-type,Authorization"
mapped-response-headers="Content-Type,Location,myHeader"
>
<int-http:request-mapping consumes="application/json" produces="application/json"/>
</int-http:inbound-gateway>
<!-- split payload -->
<int:splitter id='splitter' ref='splitterObject' method='split'
input-channel='postInboundRequest-channel' output-channel='mainRouter-channel'/>
<!-- MAIN gives the response to the http request, ITEM must continue asychronously -->
<int:header-value-router input-channel="mainRouter-channel" default-output-channel="executor-channel" header-name="myType">
<int:mapping value="MAIN" channel="httpResponse-channel"/>
<int:mapping value="ITEM" channel="executor-channel"/>
<int:mapping value="ERROR" channel="postFinish-channel"/>
</int:header-value-router>
<int:transformer input-channel="httpResponse-channel"
output-channel="postFinish-channel"
ref="responderObject"/>
<!-- execute items in parallel -->
<task:executor id="taskExecutor" pool-size="10" rejection-policy="DISCARD"/>
<int:channel id="executor-channel">
<int:dispatcher task-executor="taskExecutor"/>
</int:channel>
<int:transformer input-channel="executor-channel"
output-channel="afterTransformation1-channel"
ref="aTransformationBean"/>
<int:transformer input-channel="afterTransformation1-channel"
output-channel="afterTransformation2-channel"
ref="aTransformationBean2"/>
<!-- ITEM continues, ERROR has finished -->
<int:header-value-router input-channel="afterTransformation2-channel" default-output-channel="nullChannel" header-name="myType">
<int:mapping value="ITEM" channel="ItemAsyncPrcGatewayChain-channel"/>
<int:mapping value="ERROR" channel="nullChannel"/>
</int:header-value-router>
<!-- wrap the whole async process in a chain in order to catch exceptions in the error-channel -->
<int:chain input-channel="ItemAsyncPrcGatewayChain-channel">
<int:gateway id="itemAsyncPrcGateway" request-channel="itemAsyncProcess-channel" error-channel="asyncProcessError-channel"/>
</int:chain>
<int:transformer input-channel="asyncProcessError-channel"
ref="asyncExceptionHandler"/>
<int:chain input-channel="itemAsyncProcess-channel" output-channel="nullChannel">
<int:transformer ref="asyncTransformerBean"/>
<int:transformer ref="asyncSendJmsMessageBean"/>
<int:transformer ref="asyncStoreStateBean"/>
</int:chain>
</beans>
使用不同的配置运行多个测试,但没有成功
答:
还行。我明白你的问题了。
你在 .这是关于请求-回复的。其合同基于:<int:gateway id="itemAsyncPrcGateway">
<chain>
@FunctionalInterface
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
但是,在该网关的子流结束时,您可以执行以下操作:
<int:chain input-channel="itemAsyncProcess-channel" output-channel="nullChannel">
因此,您不会向该网关返回任何回复。这可能是您在每个项目流程结束时所期望的,但您在中间是错误的。<gateway>
不如摆脱它并将项目直接路由到 ?<int:chain input-channel="ItemAsyncPrcGatewayChain-channel">
itemAsyncProcess-channel
关键是网关正在等待回复,阻止了它工作的线程。
评论
<int:chain input-channel="ItemAsyncPrcGatewayChain-channel">
asyncSendJmsMessageBean
asyncProcessError-channel
reply-timeout="0"
<gateway>
ExpressionEvaluatingRequestHandlerAdvice
splitter
asyncSendJmsMessageBean
asyncProcessError-channel
asyncExceptionHandler
<chain>
reply-timeout="0"
评论
rejection-policy="DISCARD"
queue-capacity
unbounded queue
rejection-policy
rejection-policy
pool-size="0-10"
max. 10 threads will process in parallel
executor-channel queue