提问人:Luke 提问时间:9/7/2023 最后编辑:Arun SudhakaranLuke 更新时间:9/7/2023 访问量:49
Kafka 侦听器 - 当 ACK 模式设置为 MANUAL_IMMEDIATE 时,是否可以在 DefaultErrorHandler 中确认消息
Kafka listener - Is it possible to acknowledge message in DefaultErrorHandler , when ACK mode is set to MANUAL_IMMEDIATE
问:
我创建了 kafka 侦听器,设置为 .ACK mode
MANUAL_IMMEDIATE
我正在使用 DefaultErrorHandler 和 DeadLetterPublishingRecoverer 重试消息几次,如果仍然失败,请将其发送到其他主题。
BackOff fixedBackOff = new FixedBackOff(2000, 3);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(kafkaRecord, exception) -> new TopicPartition("OTHER-TOPIC", kafkaRecord.partition()));
DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(recoverer, fixedBackOff);
是否可以在将消息发送到其他主题后确认失败的消息。 我不想有重复或丢失的消息。
我试图在异常中传递 Acknowledgeledgment 对象 - 但原始异常被 ListenerExecutionFailedException 取代。
我还发现该方法已弃用。setRecoveryCallback
答:
1赞
Gary Russell
9/7/2023
#1
默认情况下(自 2.5.3 起),容器会自动确认恢复的消息(即使使用手动确认模式也是如此)。
/**
* Set to false to prevent the container from committing the offset of a recovered
* record (when the error handler does not itself throw an exception).
* @param ack false to not commit.
*/
default void setAckAfterHandle(boolean ack) {
评论
0赞
Luke
9/7/2023
这很奇怪。当我使用 DefaultErrorHandler 时,即使我将“setAckAfterHandle”设置为 true。我发布了失败的按摩 - >重试完成,但当前偏移量在 kafka 代理上没有变化 - 滞后且日志结束偏移量正在增加。
0赞
Luke
9/7/2023
看起来 defaultErrorHandler.setCommitRecovered(true) 正在:)完成工作
0赞
Gary Russell
9/7/2023
哦,对了;我忘了。
评论