Azure 服务总线 JMS Spring 长时间运行作业

Azure Service Bus JMS Spring Long running Job

提问人:Yash 提问时间:8/28/2023 更新时间:8/28/2023 访问量:62

问:

我知道这个问题被问了很多次,但我找不到任何好的答案。

因此,我有一个 AzureServiceBus,每当将新的 xml 文件上传到 Blob 时,它都会从 AzureBlob 获取新消息。

在我的 Spring 微服务中,我有一个 JMSListner,它监听此 ServiceBus,并在触发文件上传事件时处理消息。 对于小文件,没有问题,而且工作得很好。但是对于大文件(30GB+),微服务进程需要更长的时间(超过 10 分钟),并且消息侦听器在 5 分钟后超时,消息被放回队列中,不久之后侦听器再次拾取相同的消息,而前一个进程仍在运行。

由于业务限制,我只能按顺序处理文件,因此在处理第一个文件之前无法处理下一个文件。

所以我想知道,是

  1. 可以增加超时时间,以便侦听器应完成第一个作业。
  2. 或者,如果我将长时间运行的进程放在 Async 方法中,那么侦听器不应接受新消息。
  3. 或者,即使侦听器接受新消息,我也应该能够优雅地确认或将消息放回队列中。

这是我的代码片段。

这是我目前的实现

  @JmsListener(
    destination = "file-name-queue",
    containerFactory = "jmsListenerContainerFactory"
  )
  public void processMessages(JmsMessage message) throws JMSException{
    
      String fileName = message.getBody(String.class);
      log.info("Process The file {}",fileName);
      // This is the long running Process
      processFullFile(fileName);

      log.info("Process finished for The file {}",fileName);

  }

如果没有办法增加超时,我打算把它改成这个。

  @JmsListener(
    destination = "file-name-queue",
    containerFactory = "jmsListenerContainerFactory"
  )
  public void processMessages(JmsMessage message) throws JMSException{

      String fileName = message.getBody(String.class);
      log.info("Received a new file {}",fileName);
      // Check if there 
      if(fileInProgress()) {
        log.info("Existing file already in progress");
        
        //TODO: do something here, so that message gets back in Queue
        // I tried throwing exception here after waiting for sometime, which put the message
        // back in the queue but in the last, and also it was not that graceful
        
      } else {
        log.info("Process The file {}",fileName);
        // This is the long running Process
        aysncProcessFullFile(fileName);
      }
  }

当前 Spring 属性

spring: jms: servicebus: enabled: true connection-string: Endpoint=sb://XXXXXX.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SHARED_ACCESS_KEY idle-timeout: 2000000 pricing-tier: Standard

在正常模式下,对于长时间运行的进程,我收到以下异常。 10:37:56.246 [org.springframework.jms.JmsListenerEndpointContainer#0-1] INFO c.h.s.c.config.MessageQueueConfig - 正在处理中 10:37:56.286 [org.springframework.jms.JmsListenerEndpointContainer#0-1] INFO o.a.q.jms.JmsLocalTransactionContext - 事务提交失败:TX:ID:2897b0c2-8f4e-41d3-9122-f1f89c9f8b83:1:115 10:37:56.287 [org.springframework.jms.JmsListenerEndpointContainer#0-1] INFO o.s.j.c.CachingConnectionFactory - 遇到 JMSException - 重置底层 JMS 连接 jakarta.jms.TransactionRolledBackException:事务未声明 Reference:1cbe9d94-65c4-4a0b-a58d-29bbdf2d616d, TrackingId:6b39f9d4-c914-4ec8-b764-53cbc484b136_G1, SystemTracker:gtm, 时间戳:2023-08-28T08:37:56 [condition = amqp:transaction:unknown-id] 在 org.apach

spring-boot xml 解析 azureservicebus spring-jms azure-servicebus-queues

评论


答:

0赞 Sean Feldman 8/28/2023 #1

使用 时,您应该能够续订锁定租约。ServiceBusClientBuilder

ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .receiver()
            .queueName(queueName)
            .maxAutoLockRenewDuration(Duration.ofMinutes(15))
            .buildAsyncClient();

评论

0赞 Yash 8/30/2023
你好肖恩,非常感谢你的回答。早些时候,我使用了来自 <artifactId>spring-cloud-azure-starter-servicebus-jms</artifactId 的 JMSListner> 我只是想用它找到一些解决方案。现在,我已使用<artifactId>azure-messaging-servicebus</artifactId>转到您的建议,并且工作正常。在后台,它会不断更新锁,直到定义最大值。@PostConstruct public void onMessage(){ serviceBusReceiverAsyncClient.receiveMessages().subscribe( message -> { //做事 } ); }
0赞 Sean Feldman 8/31/2023
azure-messaging-servicebus确实是要使用的包。