提问人:samsamsamsmasma 提问时间:9/17/2023 更新时间:9/17/2023 访问量:24
为什么Spring Kafka Consumer发生异常时滞后不增加
Why does not lag increase when exception occurs in Spring Kafka Consumer
问:
我正在使用弹簧卡夫卡,我有一个问题。 Spring Kafka 版本为 2.6.X。
我将自动提交设置设置为 false。 此时,使用消息的使用者强制犯了错误。 正如我所料,提交不是手动完成的,因此应该增加延迟。 但是,滞后并没有增加。 如何处理现有的默认错误处理程序?
class KafkaConsumerConfig {
@Bean
fun multiTypeConsumerFactory(): ConsumerFactory<String, Any> {
val props = HashMap<String, Any>()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
return DefaultKafkaConsumerFactory(props)
}
@Bean
fun multiTypeKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
factory.setConsumerFactory(multiTypeConsumerFactory())
return factory
}
}
@Component
@KafkaListener(topics = ["testTopic"], groupId = "group_sdas")
class Consumer {
@KafkaHandler
fun handleFoo(message: Foo) {
println("Received Message in group foo: ${message}")
}
@KafkaHandler
fun handleBar(message: Bar) {
throw RuntimeException("ssstest sss")
}
}
即使发生错误,滞后也不会增加,因为标头中不包含类型 id。
答: 暂无答案
评论
fun handleBar(message: Bar, ack: Acknowledgment) { ... ack.acknowledge(); }
ErrorHandlingDeserializer