提问人:Samruddhi 提问时间:10/9/2023 最后编辑:Arun SudhakaranSamruddhi 更新时间:10/10/2023 访问量:63
Spring kafka 批处理监听器事务
Spring kafka batch listener transaction
问:
我正在使用 spring kafka 批处理侦听器。为了处理异常,我抛出带有失败消息索引的 BatchListenerFailedException。在 kafka config 类中,我添加了这个逻辑。
DeadLetterPublishingRecoverer dlpRecoverer = new DeadLetterPublish8ngRecoverer(dltTemplate,(record, exception) -> new TopicPartition(topicName, record.partition()));
DefaultErrorHandler errorHandler = new DefaultErrorHandler(dlpRecoverer, new FixedBackOff(0,0));
factory.setCommonErrorHandler(errorHandler);
执行此逻辑后,失败的 msg 被推送到 dlt 主题,但 offset 会间歇性地提交。因此,同一批消息再次被轮询。 我在commit.sync时收到异常 CommitFailedException:无法完成偏移提交,因为使用者不是自动分区分配的活动组的一部分。
尝试过 BatchExceptionHandling 逻辑,但不起作用
答:
0赞
Gary Russell
10/9/2023
#1
您的消费者处理批处理时间过长,而 Kafka 将消费者踢出组;增加或减少,以便您的侦听器始终可以在不超过间隔的情况下处理记录(允许良好的安全性)。max.poll.intervl.ms
max.poll.records
评论
0赞
Samruddhi
10/10/2023
是的,我已经将间隔增加到 90 秒,只有 3 条记录,但我仍然收到那个异常。同一批次再次被轮询
0赞
Samruddhi
10/10/2023
我的要求是..轮询批处理后,如果任何记录无效,则应在错误主题中发布该记录,并提交偏移量,以便下一个轮询批处理从失败的 msg 偏移量之后的偏移量开始。
0赞
Samruddhi
10/10/2023
我观察到的另一种行为是,当提交 toCommit 和 remaining 中的偏移量时,仍然在下一批中,再次轮询相同的消息,这与预期不符
0赞
Gary Russell
10/10/2023
标题提到了交易。不能将错误处理程序用于事务,因为它在事务中运行。请改用回滚后处理器。对于事务,必须重新交付整个批次。
0赞
Samruddhi
10/10/2023
#2
我使用失败消息的索引抛出 batchListenerFailedExpetion,并使用错误处理程序将失败的记录(假设是第 2 条消息)发布到错误主题并提交偏移量。在回滚处理器逻辑中,我覆盖了进程方法,方法中没有代码,因为 defaulterrorhandler 执行了所需的工作。在下一次轮询调用中,我现在可以看到消息批处理从失败的 msg 偏移量后的偏移量开始。但问题是,由于我在批处理中使用事务,即使该批次的第一个消息在错误处理程序中成功提交,但在轮询方法中,在处理和发布与第一个消息对应的下游主题后,它将在该下游主题中不可用,因为该特定的 txn 已中止并且我们在read_committed模式下读取 8ng
评论
0赞
Thisun
10/11/2023
嗨,您是否将 MANUAL 提交与批处理侦听器一起使用?
0赞
Samruddhi
10/12/2023
是手动提交
评论