Python Flink 有状态函数入口处的 Kafka Key 访问

Kafka Key access on Ingress of a Python Flink Stateful function

提问人:user14507096 提问时间:11/18/2021 更新时间:11/19/2021 访问量:218

问:

我一直在研究 Flink 有状态函数。它看起来非常有前途 - 除了一件事 - 我希望我只是错过了它。

就我而言,看不到从 Python 中的 kafka 入口访问 kafka 密钥的方法。在 Java 中,我看到我可以使用反串化器并有效地将其打包到解码的对象中。但我找不到其他选择。
在我们的例子中,密钥具有有价值的信息,而这些信息在值中不存在。
message

有人遇到过这个 - 或者我只是错过了它?

python apache-flink 远程执行 flink-statefun

评论


答:

1赞 Igal 11/19/2021 #1

首先我要提到的是,远程函数的内置 Kafka 入口要求密钥可以解释为 UTF-8 字符串。 如果这确实是您的情况,那么您只需通过以下方式获取它:

def example(context, message):
  key = context.address.id 
  ... 
  print(key) # utf8 string

如果不是这种情况,那么很遗憾,此时您必须使用嵌入式 SDK 来提取密钥和值,然后再将消息转发到远程函数。

评论

0赞 user14507096 11/22/2021
完美 - 谢谢。Yip - 我只是错过了这个