提问人:frblazquez 提问时间:8/23/2023 更新时间:8/31/2023 访问量:136
Kafka StreamsUncaughtExceptionHandler REPLACE_THREAD 与 SHUTDOWN_CLIENT
Kafka StreamsUncaughtExceptionHandler REPLACE_THREAD vs SHUTDOWN_CLIENT
问:
我有一个包含损坏消息的旧主题,我需要完全重新处理该主题,忽略无法处理的消息。对于此方案,哪种正确的未捕获异常处理策略?
我无法完全理解 KStreams 应用程序上未捕获异常处理的 REPLACE_THREAD 策略和SHUTDOWN_CLIENT策略之间的区别。任何解释或参考都是值得赞赏的。
答:
由于我过去也感到困惑,所以我做了一些研究,我发现最好的是 confluent 在他们网站上提供的错误处理教程中的内容。
REPLACE_THREAD - 替换接收异常的线程和 使用相同数量的已配置线程继续进行处理。 (注意:这可能会导致重复的记录,具体取决于 应用程序的处理模式由 PROCESSING_GUARANTEE_CONFIG值)
SHUTDOWN_CLIENT - 关闭 Kafka 的单个实例 遇到异常的流应用程序。(这是上一个 行为和当前默认行为(如果您未提供 StreamsUncaughtExceptionHandler)
SHUTDOWN_APPLICATION - 关闭 Kafka 流的所有实例 具有相同 application-id 的应用程序。Kafka Streams 使用 重新平衡以指示所有应用程序实例关闭,因此即使 在另一台机器上运行的那些将接收到信号并退出。
在此处 https://developer.confluent.io/tutorials/error-handling/confluent.html 了解更多详细信息。你会在那里找到另一种模式,那就是SHUTDOWN_APPLICATION。但我的建议是以不同的方式解决这个问题。
我过去也遇到过类似的数据损坏问题。我记得我的 JSON 主题中的 XML 消息和手动插入的测试数据不满足任何指定的模式,这导致应用程序每天多次崩溃。如果您希望 kstreams 应用程序在损坏的消费情况下生存,我可以推荐以下两种解决方案,从而为您的 kstreams 应用程序带来更高的稳定性:
创建不良记录筛选器
在代码中初始化 Kafka 流后,你将由 map 函数执行一些转换,类似于此处的示例
KStream<String, String> inputStream = builder.stream("input-topic");
KStream<String, String> firstMappedStream = inputStream
.map((key, value) -> { return <your-transformed-message>;}
);
在映射和转换之前,请尝试使用专用的过滤器或映射器来执行所有安全逻辑。下一张地图将仅获得已清理的记录。所有损坏的记录都将在地图/过滤器中被丢弃。如果需要,也可以实现日志。解决方案如下所示
KStream<String, String> inputStream = builder.stream("input-topic");
KStream<String, String> firstMappedStream = inputStream
.filter((key, value) -> { return <your-validation-result>})
.map((key, value) -> { return <your-transformed-message>;}
);
稳定转型
由于您无法想到所有可能的损坏方式,因此在映射器中额外使用异常处理非常适合构建稳定的应用程序。我的建议是将映射逻辑包围在 try-catch 块中,并让 map 函数为损坏的记录返回 null。这样,它们就不会被传递给下一个映射函数,并且您的应用程序无论如何都会稳定运行。
希望能有所帮助
评论