提问人:Yash 提问时间:8/28/2023 更新时间:8/28/2023 访问量:62
Azure 服务总线 JMS Spring 长时间运行作业
Azure Service Bus JMS Spring Long running Job
问:
我知道这个问题被问了很多次,但我找不到任何好的答案。
因此,我有一个 AzureServiceBus,每当将新的 xml 文件上传到 Blob 时,它都会从 AzureBlob 获取新消息。
在我的 Spring 微服务中,我有一个 JMSListner,它监听此 ServiceBus,并在触发文件上传事件时处理消息。 对于小文件,没有问题,而且工作得很好。但是对于大文件(30GB+),微服务进程需要更长的时间(超过 10 分钟),并且消息侦听器在 5 分钟后超时,消息被放回队列中,不久之后侦听器再次拾取相同的消息,而前一个进程仍在运行。
由于业务限制,我只能按顺序处理文件,因此在处理第一个文件之前无法处理下一个文件。
所以我想知道,是
- 可以增加超时时间,以便侦听器应完成第一个作业。
- 或者,如果我将长时间运行的进程放在 Async 方法中,那么侦听器不应接受新消息。
- 或者,即使侦听器接受新消息,我也应该能够优雅地确认或将消息放回队列中。
这是我的代码片段。
这是我目前的实现
@JmsListener(
destination = "file-name-queue",
containerFactory = "jmsListenerContainerFactory"
)
public void processMessages(JmsMessage message) throws JMSException{
String fileName = message.getBody(String.class);
log.info("Process The file {}",fileName);
// This is the long running Process
processFullFile(fileName);
log.info("Process finished for The file {}",fileName);
}
如果没有办法增加超时,我打算把它改成这个。
@JmsListener(
destination = "file-name-queue",
containerFactory = "jmsListenerContainerFactory"
)
public void processMessages(JmsMessage message) throws JMSException{
String fileName = message.getBody(String.class);
log.info("Received a new file {}",fileName);
// Check if there
if(fileInProgress()) {
log.info("Existing file already in progress");
//TODO: do something here, so that message gets back in Queue
// I tried throwing exception here after waiting for sometime, which put the message
// back in the queue but in the last, and also it was not that graceful
} else {
log.info("Process The file {}",fileName);
// This is the long running Process
aysncProcessFullFile(fileName);
}
}
当前 Spring 属性
spring: jms: servicebus: enabled: true connection-string: Endpoint=sb://XXXXXX.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SHARED_ACCESS_KEY idle-timeout: 2000000 pricing-tier: Standard
在正常模式下,对于长时间运行的进程,我收到以下异常。 10:37:56.246 [org.springframework.jms.JmsListenerEndpointContainer#0-1] INFO c.h.s.c.config.MessageQueueConfig - 正在处理中 10:37:56.286 [org.springframework.jms.JmsListenerEndpointContainer#0-1] INFO o.a.q.jms.JmsLocalTransactionContext - 事务提交失败:TX:ID:2897b0c2-8f4e-41d3-9122-f1f89c9f8b83:1:115 10:37:56.287 [org.springframework.jms.JmsListenerEndpointContainer#0-1] INFO o.s.j.c.CachingConnectionFactory - 遇到 JMSException - 重置底层 JMS 连接 jakarta.jms.TransactionRolledBackException:事务未声明 Reference:1cbe9d94-65c4-4a0b-a58d-29bbdf2d616d, TrackingId:6b39f9d4-c914-4ec8-b764-53cbc484b136_G1, SystemTracker:gtm, 时间戳:2023-08-28T08:37:56 [condition = amqp:transaction:unknown-id] 在 org.apach
答:
使用 时,您应该能够续订锁定租约。ServiceBusClientBuilder
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString(connectionString)
.receiver()
.queueName(queueName)
.maxAutoLockRenewDuration(Duration.ofMinutes(15))
.buildAsyncClient();
评论
azure-messaging-servicebus
确实是要使用的包。
评论