事务提交后访问消息有效负载

Access message payload after transaction commit

提问人:Marc Tarin 提问时间:10/30/2023 最后编辑:Marc Tarin 更新时间:10/31/2023 访问量:14

问:

我试图更好地了解 Spring Integration 和 JMS 入站的事务管理。

我阅读了文档并发现了其他与文件轮询的提交后操作相关的帖子,但到目前为止,我还没有能够对 JMS 做类似的事情。

如果我定义基本的“读入 - 写出”以程:

@Bean
public IntegrationFlow inOutFlow() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination(new ActiveMQQueue("inQueue")))
            .handle(Jms.outboundAdapter(connectionFactory)
                    .destination("outQueue"))
            .get();
}

在将 SI 消息的内容发送到出站队列后,如何对其进行操作?

我在流程中添加了一个 transactionManager:

@Bean
public IntegrationFlow inOutFlow() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination(new ActiveMQQueue("inQueue"))
                    .configureListenerContainer(c -> c.sessionTransacted(true)
                            .transactionManager(jmsTransactionManager)))
            .handle(Jms.outboundAdapter(connectionFactory).destination("outQueue"))
            .get();

@Bean
public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
    var manager = new JmsTransactionManager(connectionFactory);
    manager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return manager;
}

文件轮询示例在引擎盖下使用 a 和 a,但我没有看到该方法的类似选项。TransactionSynchronizationFactoryExpressionEvaluatingTransactionSynchronizationProcessorconfigureListenerContainer

编辑

我在两者之间引入了一个可轮询的频道,如这篇文章中所建议的那样:

@Bean
public IntegrationFlow inFlow() {
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination(new ActiveMQQueue("inQueue")))
            .channel("queueChannel")
            .get();
}

@Bean
public MessageChannel queueChannel() {
    return MessageChannels.queue().get();
}

@Bean
public IntegrationFlow outFlow() {
    return IntegrationFlows.from("queueChannel")
                    .handle(Jms.outboundAdapter(connectionFactory)
                               .destination("outQueue"))
}

当消息发送到中间通道时,将启动一个新线程,ActiveMQ 消息将被完全使用并从其队列中删除,但其内容仍可在集成消息中用于进一步处理。

spring-integration spring-integration-jms

评论


答:

1赞 Artem Bilan 10/30/2023 #1

终结点上不支持事务同步。该文件示例基于轮询通道适配器,其逻辑略有不同。MessageProducerSupport

方法是要有一个习惯来做:AbstractRequestHandlerAdviceJms.outboundAdapter()

if (this.transactionSynchronizationFactory != null && resource != null &&
            TransactionSynchronizationManager.isActualTransactionActive()) {

        TransactionSynchronization synchronization = this.transactionSynchronizationFactory.create(resource);
        if (synchronization != null) {
            TransactionSynchronizationManager.registerSynchronization(synchronization);

在调用之前。因此,这样您就可以将同步附加到现有的 JMS 事务。doInvoke()

另一种方法是尝试使用: https://docs.spring.io/spring-framework/reference/data-access/transaction/event.html@TransactionalEventListener

评论

0赞 Marc Tarin 10/31/2023
谢谢你的解释,我会研究这个。此代码示例中的参数是什么?同时,我按照此处的建议使用了可轮询的 queueChannel,并获得了预期的行为。resource
0赞 Artem Bilan 10/31/2023
你不需要那个.我只是从框架中复制了那段代码。该方法打破了您的事务边界:它将在将消息放入该队列后立即提交。好吧,实际上它甚至可以回滚,但消息仍然会出现在该队列中......resourceQueueChannel