提问人:user14507096 提问时间:11/18/2021 更新时间:11/19/2021 访问量:218
Python Flink 有状态函数入口处的 Kafka Key 访问
Kafka Key access on Ingress of a Python Flink Stateful function
问:
我一直在研究 Flink 有状态函数。它看起来非常有前途 - 除了一件事 - 我希望我只是错过了它。
就我而言,看不到从 Python 中的 kafka 入口访问 kafka 密钥的方法。在 Java 中,我看到我可以使用反串化器并有效地将其打包到解码的对象中。但我找不到其他选择。
在我们的例子中,密钥具有有价值的信息,而这些信息在值中不存在。message
有人遇到过这个 - 或者我只是错过了它?
答:
1赞
Igal
11/19/2021
#1
首先我要提到的是,远程函数的内置 Kafka 入口要求密钥可以解释为 UTF-8 字符串。 如果这确实是您的情况,那么您只需通过以下方式获取它:
def example(context, message):
key = context.address.id
...
print(key) # utf8 string
如果不是这种情况,那么很遗憾,此时您必须使用嵌入式 SDK 来提取密钥和值,然后再将消息转发到远程函数。
评论
0赞
user14507096
11/22/2021
完美 - 谢谢。Yip - 我只是错过了这个
评论