如何配置 2 个侦听同一主题的 Spring Kafka 侦听器,使其具有自己的重试和死信主题

How to configure 2 Spring Kafka listeners which listen to the same topic to have their own retry and dead letter topics

提问人:Nos 提问时间:11/17/2023 更新时间:11/17/2023 访问量:19

问:

在 Spring Kafka 中,我有两个 Kafka 听众在听同一个主题。我希望两个侦听器都有自己的重试和死信主题,因此如果一个侦听器发生故障,它不会重播给另一个侦听器。使用以下配置,它们最终都将使用相同的重试和死信主题

  • payment-topic
  • payment-topic-重试
  • payment-topic-dlt
class PaymentListener {
    
    @KafkaListener(topics = ["payment-topic"],)
    fun sendBill(payment: Payment) {
        // ...some logic
    }

    @KafkaListener(topics = ["payment-topic"],)
    fun adjustBalance(payment: Payment) {
        // ...some logic
    }
}

    @Bean
    fun paymentTopicRetryStrategy(
        template: KafkaTemplate<String, ClearingFinishedEvent>,
    ): RetryTopicConfiguration {
        return RetryTopicConfigurationBuilder
            .newInstance()
            .useSingleTopicForSameIntervals()
            .fixedBackOff(3000)
            .maxAttempts(3)
            .includeTopic("payment-topic")
            .dltProcessingFailureStrategy(DltStrategy.ALWAYS_RETRY_ON_ERROR)
            .autoCreateTopics(true, 1, BROKER_DEFAULT)
            .create(template)
    }

我真正想要的是:

  • payment-topic

[sendBill 侦听器将这些用作重试和死信主题]

  • 发送账单支付主题重试
  • 发送账单支付主题DLT

[adjustBalance 监听器使用这些作为重试和死信主题]

  • 调整余额支付主题重试
  • adjust-balance-payment-topic-dlt

我可以编写一个自定义命名策略 (https://docs.spring.io/spring-kafka/docs/current/reference/html/#custom-naming-strategies),但我无法获得侦听器的详细信息,因此我无法使其足够具体。

有什么想法吗?

apache-kafka spring-kafka

评论


答:

0赞 Gary Russell 11/17/2023 #1

不要使用(按主题名称过滤),而是在侦听器上使用;然后,您可以使用自定义 .RetryTopicConfigurationBuilder@RetryableTopicretryTopicSuffix

然后你可以有类似 和 等的东西。payment-topic-bill-retrypayment-topic-adjust-balance-retry

评论

0赞 Nos 11/17/2023
这仍然不是我真正想要的。例如,我正在寻找“send-bill-payment-topic-retry”<prefix><topicname><suffix>或<what-i-want-retry>,而不是“payment-topic-send-bill-retry”。我也想控制前缀。像这样: @RetryableTopic( // 不存在 dltTopicPrefix = “send-bill-”, retryTopicPrefix = “send-bill-”, //确实存在 dltTopicSuffix = “-dlt”, retryTopicSuffix = “-retry”, )
0赞 Nos 11/17/2023
或者像这样: 像这样的东西: @RetryableTopic( // 不存在 dltTopic = “send-bill-payment-topic-dlt”, retryTopic = “send-bill-payment-topic-dlt-retry”, )
1赞 Gary Russell 11/17/2023
目前这是不可能的;如您所说,没有向自定义名称提供程序提供上下文;我们可以增强以包括属性,但现在不可用;我提供了我能想到的唯一解决方法。在同一主题上有 2 个听众有点不寻常,但我可以看到这是一个有效的用例;请随时在 GitHub 上提出新功能请求问题。Destination.Properties@KafkaListenerid
0赞 Nos 11/17/2023
谢谢加里,感谢您的快速回复和建议。