防止 KafkaManager 将事务置于错误状态

Prevent KafkaManager put transaction in error state

提问人:user_1234567_java 提问时间:9/29/2023 最后编辑:user_1234567_java 更新时间:9/29/2023 访问量:28

问:

我在 thransaction 中按限制从 DB 项目列表中进行选择(一次选择 10 个项目),然后必须将它们全部发送到 kafka 并在同一事务中从数据库中删除所有项目。select for update

条件

如果发送到 Kafka 的操作失败并出现 TopicAuthorizationException,我需要在同一事务中继续发送下一个项目并从数据库中删除下一个项目

代码示例

Kafka 配置

 @Bean
    fun kafkaTemplate(producerFactory: ProducerFactory<String, String>): KafkaTemplate<String, String> {
        val kafkaTemplate = KafkaTemplate(producerFactory)
        kafkaTemplate.setProducerListener(object : ProducerListener<String, String> {
            override fun onError(
                producerRecord: ProducerRecord<String, String>,
                recordMetadata: RecordMetadata?,
                exception: Exception
            ) {
                if (exception is TopicAuthorizationException) {
                    logger.error("TopicAuthorizationException exception occured [${exception.message}]")
                }
            }
        })
        return kafkaTemplate
}

业务逻辑

@Service
class ItemService(
    private val kafkaMessageSender: KafkaMessageSender,
    private val itemRepository: ItemRepository
) {

 
    @Transactional(
        transactionManager = "transactionManager",
        isolation = Isolation.REPEATABLE_READ,
        propagation = Propagation.REQUIRED
    )
    fun findItemsToProcess() {

        val items = itemRepository.findItemsWithSelectForUpdateAndLimit(limit = 10)

        items.forEach { item ->
            val producerRecord = ProducerRecord<String, String>(
                item.topic,
                item.payload)

            try {
                kafkaMessageSender.send(producerRecord)
                itemRepository.delete(item)
            } catch (ex: Exception) {
                logger.error("Item error occurred [${ex.message}]")
            }
        }
    }

}

问题如果我尝试发送到不存在的主题并获得 TopicAuthorizationException 异常,那么 KafkaTransactionManager 将事务置于错误状态,并在我得到的任何操作中。这个问题应该由我解决,但它没有解决问题。Cannot execute transactional method because we are in an error statesetProducerListener

问题:如何配置 kafkaTemplate 以防止 KafkaTransactionManager 在发生 TopicAuthorizationException 时将事务置于错误状态?

java kotlin spring-kafka spring- 事务

评论


答:

0赞 Gary Russell 9/29/2023 #1

您需要显示您的事务管理器配置。但是,发生错误后,无法继续使用事务生成器。如果要将所有记录发送到同一主题,则在第一次发送时应发生此错误;您可能需要引发异常来中止事务,并将重试逻辑放在调用代码中。

评论

0赞 user_1234567_java 9/29/2023
你好加里。谢谢你的回答。> 你需要显示你的事务管理器配置 答:我没有事务管理器配置,这是 Spring 的默认配置。---- > 如果您要将所有记录发送到同一主题 答:不同的主题
0赞 Gary Russell 9/29/2023
好的,所以你需要抛出一个异常,以便以前的发送将与数据库 Tx 一起回滚。 修复身份验证问题,然后重试。