Kafka StreamsUncaughtExceptionHandler REPLACE_THREAD 与 SHUTDOWN_CLIENT

Kafka StreamsUncaughtExceptionHandler REPLACE_THREAD vs SHUTDOWN_CLIENT

提问人:frblazquez 提问时间:8/23/2023 更新时间:8/31/2023 访问量:136

问:

我有一个包含损坏消息的旧主题,我需要完全重新处理该主题,忽略无法处理的消息。对于此方案,哪种正确的未捕获异常处理策略?

我无法完全理解 KStreams 应用程序上未捕获异常处理的 REPLACE_THREAD 策略和SHUTDOWN_CLIENT策略之间的区别。任何解释或参考都是值得赞赏的。

异常 apache-kafka apache-kafka-streams exceptionhandler uncaughtexceptionhandler

评论

1赞 OneCricketeer 8/23/2023
我觉得你应该使用一个映射(如果损坏,处理并返回 null)/过滤器(删除 null)。或者只是在处理之前直接筛选,而不是让内置的异常处理运行。这样也应该更快。换句话说,您知道存在损坏的数据,因此当您确切知道异常应该是什么时,您不应该使用“未捕获的异常”,因此使用 if 语句检查数据,或者在处理时使用您自己的 try-catch

答:

1赞 maxgruber19 8/31/2023 #1

由于我过去也感到困惑,所以我做了一些研究,我发现最好的是 confluent 在他们网站上提供的错误处理教程中的内容。

REPLACE_THREAD - 替换接收异常的线程和 使用相同数量的已配置线程继续进行处理。 (注意:这可能会导致重复的记录,具体取决于 应用程序的处理模式由 PROCESSING_GUARANTEE_CONFIG值)

SHUTDOWN_CLIENT - 关闭 Kafka 的单个实例 遇到异常的流应用程序。(这是上一个 行为和当前默认行为(如果您未提供 StreamsUncaughtExceptionHandler)

SHUTDOWN_APPLICATION - 关闭 Kafka 流的所有实例 具有相同 application-id 的应用程序。Kafka Streams 使用 重新平衡以指示所有应用程序实例关闭,因此即使 在另一台机器上运行的那些将接收到信号并退出。

在此处 https://developer.confluent.io/tutorials/error-handling/confluent.html 了解更多详细信息。你会在那里找到另一种模式,那就是SHUTDOWN_APPLICATION。但我的建议是以不同的方式解决这个问题。

我过去也遇到过类似的数据损坏问题。我记得我的 JSON 主题中的 XML 消息和手动插入的测试数据不满足任何指定的模式,这导致应用程序每天多次崩溃。如果您希望 kstreams 应用程序在损坏的消费情况下生存,我可以推荐以下两种解决方案,从而为您的 kstreams 应用程序带来更高的稳定性:

创建不良记录筛选器

在代码中初始化 Kafka 流后,你将由 map 函数执行一些转换,类似于此处的示例

KStream<String, String> inputStream = builder.stream("input-topic");
KStream<String, String> firstMappedStream = inputStream
  .map((key, value) -> { return <your-transformed-message>;}
);

在映射和转换之前,请尝试使用专用的过滤器或映射器来执行所有安全逻辑。下一张地图将仅获得已清理的记录。所有损坏的记录都将在地图/过滤器中被丢弃。如果需要,也可以实现日志。解决方案如下所示

KStream<String, String> inputStream = builder.stream("input-topic");
KStream<String, String> firstMappedStream = inputStream
  .filter((key, value) -> { return <your-validation-result>})
  .map((key, value) -> { return <your-transformed-message>;}
);
稳定转型

由于您无法想到所有可能的损坏方式,因此在映射器中额外使用异常处理非常适合构建稳定的应用程序。我的建议是将映射逻辑包围在 try-catch 块中,并让 map 函数为损坏的记录返回 null。这样,它们就不会被传递给下一个映射函数,并且您的应用程序无论如何都会稳定运行。

希望能有所帮助