提问人:Shekar Tippur 提问时间:2/17/2016 最后编辑:Matthias J. SaxShekar Tippur 更新时间:2/17/2016 访问量:3027
Apache Flink - 序列化 json 并执行 join 操作
Apache Flink - Serialize json and perform join operation
问:
我正在尝试使用 Jackson 库从 Kafka 主题中读取字符串并从另一个流执行连接。
下面是一个包含两个数据流的示例代码。我想对这些消息流执行联接操作。
例如,传入的流是:
messageStream1 = {"A":"a"}
messageStream2 = {"B":"a"}
联接条件为 。如何在 Flink 中实现这一点?messageStream1."A" = messageStream2."B"
数据流 1:
DataStream<String> messageStream1 = env.addSource(
new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream1.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String value) throws Exception {
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
try {
JsonNode rootNode = mapper.readTree(value);
Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String,JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
}
return rootNode;
}catch (java.io.IOException ex){
ex.printStackTrace();
return null;
}
}
});
数据流 2:
DataStream<String> messageStream2 = env.addSource(
new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream2.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String value) throws Exception {
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
try {
JsonNode rootNode = mapper.readTree(value);
Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String,JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
}
return rootNode;
}catch (java.io.IOException ex){
ex.printStackTrace();
return null;
}
}
});
答:
2赞
Matthias J. Sax
2/17/2016
#1
您需要将键字段提取到一个额外的属性中,以便 Flink 可以访问它(另一种方法是提供自定义键选择器:https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#specifying-keys)。
因此,返回的类型可能是 (if 是 join 属性的正确类型)。map(...)
Tuple2<String,JsonNode>
String
然后,您可以按照文档 (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html) 中的说明指定您的联接:
messageStream1.join(messageStream2)
.where(0).equalTo(0) // both zeros indicate that the join happens on the zero's attribute, ie, the String attribute of Tuple2
.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
.apply(new JoinFunction() {...});
为了使用 API 执行联接,您还需要指定一个联接窗口。只能联接属于同一窗口的元组。DataStream
评论
0赞
Shekar Tippur
2/17/2016
感谢您的回复。我没有关于输入数据的固定结构。我只知道它是一个 JSON。如果是这种情况,我是否需要求助于反射来自动即时创建对象?我知道可能会因此而产生开销。
0赞
Matthias J. Sax
2/17/2016
如果我理解正确,你有两个JSON对象流,并希望同时加入两个。因此,您期望每个对象至少有一个固定字段“X”,其已知类型为“T”,您要用于联接。因此,您可以提取“X”并将其放入 Tuple2 中,该 Tuple2 在其两个属性中保存键和整个 JSON。或者有没有可能,有些对象不包含“X”?如果是,您处理这些问题的策略是什么?
0赞
Shekar Tippur
2/18/2016
你是对的。让我稍微改写一下。Stream1 = {“A”:“a”, “B”:“b”} Stream2 = {“B”:“a”,“X”:“x”} 鉴于这是 2 个流,如果我必须在 Stream1.A=Stream2.B 上加入 如果我没看错,map 函数应该返回带有 <String,String 的元组。>?
0赞
Matthias J. Sax
2/18/2016
不确定我是否理解正确。在您的示例中,您似乎只显示每个流的一个元组。如果我们假设 A 和 B 是您要加入的字段,则 map 会将 JSON 转换为 (ie, a )。与 stream2 类似,则转换为 .因此,您只需提取键字段值,并将键与完整的 JSON 对象一起发出。{"A":"a", "B":"b"}
<"a", {"A":"a", "B":"b"}>
Tuple2<String, JsonNode>
{"B":"a","X":"x"}
<"a", {"B":"a","X":"x"}>
0赞
Shekar Tippur
2/19/2016
不好意思。。花了一点时间才做出回应。谢谢你的解释。我的理解还差得很远。再次感谢。
评论