spring-Integration - http 入站网关响应和异步处理 - 任务执行器线程只处理一条消息而不是所有消息

spring-Integration - http inbound-gateway response and asynchronous processing - task executor threads just processing one message instead of all

提问人:ste 提问时间:11/2/2023 最后编辑:ste 更新时间:11/2/2023 访问量:52

问:

描述:

我们有一个 Spring Integration 流程,我们希望在其中配置一个端点、拆分有效负载、返回给消费者,即使用 http 代码 202 和其他项目必须继续异步处理。

配置说明:

我们有一个.我们拆分传入的有效负载。创建始终是一个列表,列表的第一条消息是,然后是以下消息。<int-http:inbound-gateway><int:splitter>splitterMAINITEM

在 with 输入通道中:消息将应答接收方线程,以便 API 的使用者获得 202。消息需要异步处理,即使使用者已经收到了 202。<int:header-value-routermainItemRouter-channelMAINITEM

问题:

如果我有 即拆分器之后的 25 条(1 条和 24 条)消息的列表,线程只处理 10 条。这意味着每个线程只处理一条消息,然后停止。MAINITEMtaskExecutor

为什么线程不拾取其他 14 条消息进行处理?taskExecutor

所有 24 条消息是否都到达了执行器通道?ITEM

这是配置:

<beans>
    <int:channel id="postInboundReply-channel"/>
    <int:header-enricher input-channel="postFinish-channel" output-channel="postInboundReply-channel">
        <int:header name="myHeader" overwrite="true"
                    expression="setMyHeader"/>
    </int:header-enricher>

    <!-- Endpoint -->
    <int-http:inbound-gateway id="postInbound-gateway"
                              request-channel="postInboundRequest-channel"
                              reply-channel="postInboundReply-channel"
                              supported-methods="POST"
                              path="/api/tests"
                              request-payload-type="java.lang.String"
                              mapped-request-headers="content-type,Authorization"
                              mapped-response-headers="Content-Type,Location,myHeader"
                              >
        <int-http:request-mapping consumes="application/json" produces="application/json"/>
    </int-http:inbound-gateway>


    <!-- split payload -->
    <int:splitter id='splitter' ref='splitterObject' method='split'
                  input-channel='postInboundRequest-channel' output-channel='mainRouter-channel'/>

    <!--  MAIN gives the response to the http request, ITEM must continue asychronously -->
    <int:header-value-router input-channel="mainRouter-channel"  default-output-channel="executor-channel" header-name="myType">
        <int:mapping value="MAIN" channel="httpResponse-channel"/>
        <int:mapping value="ITEM" channel="executor-channel"/>
        <int:mapping value="ERROR" channel="postFinish-channel"/>
    </int:header-value-router>

    <int:transformer input-channel="httpResponse-channel"
                     output-channel="postFinish-channel"
                     ref="responderObject"/>

    <!-- execute items in parallel -->
    <task:executor id="taskExecutor" pool-size="10" rejection-policy="DISCARD"/>
    <int:channel id="executor-channel">
        <int:dispatcher task-executor="taskExecutor"/>
    </int:channel>

    <int:transformer input-channel="executor-channel"
                     output-channel="afterTransformation1-channel"
                     ref="aTransformationBean"/>

    <int:transformer input-channel="afterTransformation1-channel"
                     output-channel="afterTransformation2-channel"
                     ref="aTransformationBean2"/>

    <!-- ITEM continues, ERROR has finished -->
    <int:header-value-router input-channel="afterTransformation2-channel" default-output-channel="nullChannel" header-name="myType">
        <int:mapping value="ITEM" channel="ItemAsyncPrcGatewayChain-channel"/>
        <int:mapping value="ERROR" channel="nullChannel"/>
    </int:header-value-router>

    <!-- wrap the whole async  process in a chain in order to catch exceptions in the error-channel -->
    <int:chain input-channel="ItemAsyncPrcGatewayChain-channel">
      <int:gateway id="itemAsyncPrcGateway" request-channel="itemAsyncProcess-channel" error-channel="asyncProcessError-channel"/>
    </int:chain>

    <int:transformer input-channel="asyncProcessError-channel"
                     ref="asyncExceptionHandler"/>

    <int:chain input-channel="itemAsyncProcess-channel" output-channel="nullChannel">
        <int:transformer ref="asyncTransformerBean"/>
        <int:transformer ref="asyncSendJmsMessageBean"/>
        <int:transformer ref="asyncStoreStateBean"/>
    </int:chain>

</beans>

使用不同的配置运行多个测试,但没有成功

Java 多线程异 Spring 集成

评论

0赞 Artem Bilan 11/2/2023
但你自己说的:.因此,这意味着任务执行器将接受 10 个任务并丢弃其余任务,因为它的所有线程都处于繁忙状态。rejection-policy="DISCARD"
0赞 ste 11/2/2023
我没有定义一个,这意味着根据我拥有的文档以及我如何解释文档没有效果。删除并尝试也不起作用,因为之前 10 个线程只拾取一条消息,而在 10 个线程完成后,10 个线程不会从队列中拾取下一条消息。我认为通过这种配置,我可以定义:中的消息queue-capacityunbounded queuerejection-policyrejection-policypool-size="0-10"max. 10 threads will process in parallelexecutor-channel queue

答:

0赞 Artem Bilan 11/2/2023 #1

还行。我明白你的问题了。

你在 .这是关于请求-回复的。其合同基于:<int:gateway id="itemAsyncPrcGateway"><chain>

@FunctionalInterface
public interface RequestReplyExchanger {

    Message<?> exchange(Message<?> request) throws MessagingException;

}

但是,在该网关的子流结束时,您可以执行以下操作:

<int:chain input-channel="itemAsyncProcess-channel" output-channel="nullChannel">

因此,您不会向该网关返回任何回复。这可能是您在每个项目流程结束时所期望的,但您在中间是错误的。<gateway>

不如摆脱它并将项目直接路由到 ?<int:chain input-channel="ItemAsyncPrcGatewayChain-channel">itemAsyncProcess-channel

关键是网关正在等待回复,阻止了它工作的线程。

评论

0赞 ste 11/3/2023
嗨,阿尔乔姆,感谢您的回复。当我删除处理工作时,但我无法再捕获异常(抛出,即由 bean 抛出),这就是我配置网关并将其包装成链的原因。对此有什么想法吗?<int:chain input-channel="ItemAsyncPrcGatewayChain-channel">asyncSendJmsMessageBeanasyncProcessError-channel
0赞 Artem Bilan 11/3/2023
确定!那也是可能的。请参阅要处理错误的特定终结点的 on 或 use:docs.spring.io/spring-integration/reference/handler-advice/...reply-timeout="0"<gateway>ExpressionEvaluatingRequestHandlerAdvice
0赞 ste 11/6/2023
不知道我如何将您的建议用于我的用例。我想要捕获和处理的异常不在端点级别。它们被抛出在异步进程中,主线程已经返回到端点请求,即 202。在异步过程中,将处理几条消息。有些会处理得很好,有些会失败。我想处理来自 .因异常而失败的消息只是在另一个通道上。splitter
0赞 ste 11/6/2023
请参阅原始配置,如果即在 bean 中没有异常,我会继续在流程中,但如果有异常,我想跳转到以处理异常,仅针对这一条具有异常的消息,在 bean 中,当我处理这一条异常时,线程应该选择下一条消息进行处理。asyncSendJmsMessageBeanasyncProcessError-channelasyncExceptionHandler
0赞 Artem Bilan 11/6/2023
您的问题,即当一切正常时,网关正在等待回复。您在发送线程上遇到了异常,因此仅具有异常就足以满足您当前的期望。<chain>reply-timeout="0"