Kafka 侦听器 - 当 ACK 模式设置为 MANUAL_IMMEDIATE 时,是否可以在 DefaultErrorHandler 中确认消息

Kafka listener - Is it possible to acknowledge message in DefaultErrorHandler , when ACK mode is set to MANUAL_IMMEDIATE

提问人:Luke 提问时间:9/7/2023 最后编辑:Arun SudhakaranLuke 更新时间:9/7/2023 访问量:49

问:

我创建了 kafka 侦听器,设置为 .ACK modeMANUAL_IMMEDIATE

我正在使用 DefaultErrorHandler 和 DeadLetterPublishingRecoverer 重试消息几次,如果仍然失败,请将其发送到其他主题。

BackOff fixedBackOff = new FixedBackOff(2000, 3);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
            (kafkaRecord, exception) -> new TopicPartition("OTHER-TOPIC", kafkaRecord.partition()));
DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(recoverer, fixedBackOff);

是否可以在将消息发送到其他主题后确认失败的消息。 我不想有重复或丢失的消息。

我试图在异常中传递 Acknowledgeledgment 对象 - 但原始异常被 ListenerExecutionFailedException 取代。

我还发现该方法已弃用setRecoveryCallback

java spring-boot spring-kafka 弃用警告

评论


答:

1赞 Gary Russell 9/7/2023 #1

默认情况下(自 2.5.3 起),容器会自动确认恢复的消息(即使使用手动确认模式也是如此)。

/**
 * Set to false to prevent the container from committing the offset of a recovered
 * record (when the error handler does not itself throw an exception).
 * @param ack false to not commit.
 */
default void setAckAfterHandle(boolean ack) {

评论

0赞 Luke 9/7/2023
这很奇怪。当我使用 DefaultErrorHandler 时,即使我将“setAckAfterHandle”设置为 true。我发布了失败的按摩 - >重试完成,但当前偏移量在 kafka 代理上没有变化 - 滞后且日志结束偏移量正在增加。
0赞 Luke 9/7/2023
看起来 defaultErrorHandler.setCommitRecovered(true) 正在:)完成工作
0赞 Gary Russell 9/7/2023
哦,对了;我忘了。