提问人:mparada 提问时间:10/13/2023 更新时间:10/14/2023 访问量:30
如何实现 Kafka 消息与其他系统的转换?
How to approach the conversion of Kafka messages from and to other systems?
问:
我希望有一系列小型的独立服务,这些服务要么使用 Kafka 主题并将数据输出到不同的系统中,要么相反:从系统接收数据并生成 Kafka 消息。对于这样的应用,正确的方法是什么?
示例 1:将 SQL 查询结果转换为 Kafka 消息流
让我们以 DB -> Kafka 为例。理想情况下,该服务将使用 Avro 架构和 SQL 查询以及连接配置(URL、凭据、主题、使用者组等)进行配置
schema.avsc
:
{
"type" : "record",
"namespace" : "BookExample",
"name" : "book",
"fields" : [
{ "name" : "title" , "type" : "string" },
{ "name" : "year" , "type" : "int" }
]
}
query.sql
:
SELECT title, year from books;
一旦启动,应用程序将执行查询并将结果通过管道传输到 Kafka。
现在,由于输入和输出都是配置,因此无法在编译时对系统进行类型检查。一旦它尝试运行,它就必须在运行时抛出某种形式的错误(解析错误?
另请注意,应用程序的定义不足。为简单起见,它将直接将列映射到同名字段(因为 Avro 架构不保证顺序)。这很好。也许更复杂的应用程序可以采用地图,但这对于此示例并不重要。{columnName:fieldName}
示例 2:将 Kafka 消息持久化到数据库表中
与示例 1 相同,但相反。现在甚至不需要 SQL 查询。只需要一个表名作为配置(假设像以前一样的约定)。column-field
应用程序将使用具有上述 Avro 架构的 Kafka 主题,并将每条消息作为目标表中的一行写入。
示例 3:HTTP
对于接收 JSON 有效负载并将其发布到配置的 Kafka 主题的 Web 服务,也可以执行相同的操作。如果有效负载不符合 Avro 架构,可以发送 400 状态代码。
我做了什么?
我确实在 Scala 中实现了如上所述的小型应用程序。但问题是我不能使它们完全通用。编译时需要 Avro 架构和表定义来创建对象。这使得应用程序非常不灵活。
我该怎么办?
我的第一个想法是用非类型化或动态类型化语言(Python?)实现它。但这开始看起来很像解析文本输入和生成代码(在 SQL 的情况下)。这就是我写这个问题的原因。我不确定这是正确的方法。在我看来,这个应用程序是一种编译器/解释器?将一种类型的数据(文本?)转换为另一种类型的数据。我
可用工具
我知道 Kafka Connect(尽管我从未使用过它),在我看来,它与 Kafka 代理非常紧密地结合在一起。我想知道是否可以有一个轻量级的应用程序,易于部署并且对 Kafka 代理透明。对于代理来说,应用程序看起来像一个普通的消费者或生产者。
其他 SO 问题
我看到了这个问题,但解决方案意味着更改接收应用程序以反序列化 Kafka 消息。我想要一个中间的应用程序,这样两端(Kafka 和其他系统)就不知道彼此了。
答:
DB -> Kafka。理想情况下,该服务将使用 Avro 架构和 SQL 查询以及连接配置进行配置
当然,Debezium 会这样做。
直接将列映射到同名字段
这正是它的作用
示例 1 但相反
这就是 Kafka Connect Sinks 的作用,是的。(请尝试一下)
接收 JSON 有效负载并将其发布到配置的 Kafka 主题的 Web 服务
Confluent 有一个 Kafka REST 代理,但也有其他解决方案,如 Strimzi Bridge、karapace 等。
与 Kafka 代理非常紧密耦合的东西
您已经编写了一个 Kafka 客户端...这与 Connect 的耦合级别相同。不应在代理服务器上运行 Connect。
对于代理来说,应用程序看起来像一个普通的消费者或生产者。
这就是 Connect(以及在此基础上构建的 Debezium)所做的,是的。
我想要一个中间的应用程序
Kafka 流?或者 Apache Spark/Flink/NiFi 都支持数据库和 Avro...总的来说,这里需要一个流处理框架。
评论