提问人:Marc Tarin 提问时间:10/30/2023 最后编辑:Marc Tarin 更新时间:10/31/2023 访问量:14
事务提交后访问消息有效负载
Access message payload after transaction commit
问:
我试图更好地了解 Spring Integration 和 JMS 入站的事务管理。
我阅读了文档并发现了其他与文件轮询的提交后操作相关的帖子,但到目前为止,我还没有能够对 JMS 做类似的事情。
如果我定义基本的“读入 - 写出”以程:
@Bean
public IntegrationFlow inOutFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(new ActiveMQQueue("inQueue")))
.handle(Jms.outboundAdapter(connectionFactory)
.destination("outQueue"))
.get();
}
在将 SI 消息的内容发送到出站队列后,如何对其进行操作?
我在流程中添加了一个 transactionManager:
@Bean
public IntegrationFlow inOutFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(new ActiveMQQueue("inQueue"))
.configureListenerContainer(c -> c.sessionTransacted(true)
.transactionManager(jmsTransactionManager)))
.handle(Jms.outboundAdapter(connectionFactory).destination("outQueue"))
.get();
@Bean
public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
var manager = new JmsTransactionManager(connectionFactory);
manager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return manager;
}
文件轮询示例在引擎盖下使用 a 和 a,但我没有看到该方法的类似选项。TransactionSynchronizationFactory
ExpressionEvaluatingTransactionSynchronizationProcessor
configureListenerContainer
编辑
我在两者之间引入了一个可轮询的频道,如这篇文章中所建议的那样:
@Bean
public IntegrationFlow inFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(new ActiveMQQueue("inQueue")))
.channel("queueChannel")
.get();
}
@Bean
public MessageChannel queueChannel() {
return MessageChannels.queue().get();
}
@Bean
public IntegrationFlow outFlow() {
return IntegrationFlows.from("queueChannel")
.handle(Jms.outboundAdapter(connectionFactory)
.destination("outQueue"))
}
当消息发送到中间通道时,将启动一个新线程,ActiveMQ 消息将被完全使用并从其队列中删除,但其内容仍可在集成消息中用于进一步处理。
答:
终结点上不支持事务同步。该文件示例基于轮询通道适配器,其逻辑略有不同。MessageProducerSupport
方法是要有一个习惯来做:AbstractRequestHandlerAdvice
Jms.outboundAdapter()
if (this.transactionSynchronizationFactory != null && resource != null &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronization synchronization = this.transactionSynchronizationFactory.create(resource);
if (synchronization != null) {
TransactionSynchronizationManager.registerSynchronization(synchronization);
在调用之前。因此,这样您就可以将同步附加到现有的 JMS 事务。doInvoke()
另一种方法是尝试使用: https://docs.spring.io/spring-framework/reference/data-access/transaction/event.html@TransactionalEventListener
评论
resource
resource
QueueChannel
评论