Apache Flink - 序列化 json 并执行 join 操作

Apache Flink - Serialize json and perform join operation

提问人:Shekar Tippur 提问时间:2/17/2016 最后编辑:Matthias J. SaxShekar Tippur 更新时间:2/17/2016 访问量:3027

问:

我正在尝试使用 Jackson 库从 Kafka 主题中读取字符串并从另一个流执行连接。

下面是一个包含两个数据流的示例代码。我想对这些消息流执行联接操作。

例如,传入的流是:

messageStream1 = {"A":"a"}
messageStream2 = {"B":"a"}

联接条件为 。如何在 Flink 中实现这一点?messageStream1."A" = messageStream2."B"

数据流 1:

DataStream<String> messageStream1 = env.addSource(
  new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));

messageStream1.map(new MapFunction<String, JsonNode>() {
    @Override
    public JsonNode map(String value) throws Exception {
        JsonFactory factory = new JsonFactory();
        ObjectMapper mapper = new ObjectMapper(factory);
        try {
            JsonNode rootNode = mapper.readTree(value);
            Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
            while (fieldsIterator.hasNext()) {
                Map.Entry<String,JsonNode> field = fieldsIterator.next();
                System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
            }
            return rootNode;
        }catch (java.io.IOException ex){
            ex.printStackTrace();
            return null;
        }
    }
});

数据流 2:

DataStream<String> messageStream2 = env.addSource(
  new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));

messageStream2.map(new MapFunction<String, JsonNode>() {
    @Override
    public JsonNode map(String value) throws Exception {
        JsonFactory factory = new JsonFactory();
        ObjectMapper mapper = new ObjectMapper(factory);
        try {
            JsonNode rootNode = mapper.readTree(value);
            Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
            while (fieldsIterator.hasNext()) {
                Map.Entry<String,JsonNode> field = fieldsIterator.next();
                System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
            }
            return rootNode;
        }catch (java.io.IOException ex){
            ex.printStackTrace();
            return null;
        }
    }
});
加入 apache-kafka apache-flink

评论


答:

2赞 Matthias J. Sax 2/17/2016 #1

您需要将键字段提取到一个额外的属性中,以便 Flink 可以访问它(另一种方法是提供自定义键选择器:https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#specifying-keys)。

因此,返回的类型可能是 (if 是 join 属性的正确类型)。map(...)Tuple2<String,JsonNode>String

然后,您可以按照文档 (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html) 中的说明指定您的联接:

messageStream1.join(messageStream2)
    .where(0).equalTo(0) // both zeros indicate that the join happens on the zero's attribute, ie, the String attribute of Tuple2
    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
    .apply(new JoinFunction() {...});

为了使用 API 执行联接,您还需要指定一个联接窗口。只能联接属于同一窗口的元组。DataStream

评论

0赞 Shekar Tippur 2/17/2016
感谢您的回复。我没有关于输入数据的固定结构。我只知道它是一个 JSON。如果是这种情况,我是否需要求助于反射来自动即时创建对象?我知道可能会因此而产生开销。
0赞 Matthias J. Sax 2/17/2016
如果我理解正确,你有两个JSON对象流,并希望同时加入两个。因此,您期望每个对象至少有一个固定字段“X”,其已知类型为“T”,您要用于联接。因此,您可以提取“X”并将其放入 Tuple2 中,该 Tuple2 在其两个属性中保存键和整个 JSON。或者有没有可能,有些对象不包含“X”?如果是,您处理这些问题的策略是什么?
0赞 Shekar Tippur 2/18/2016
你是对的。让我稍微改写一下。Stream1 = {“A”:“a”, “B”:“b”} Stream2 = {“B”:“a”,“X”:“x”} 鉴于这是 2 个流,如果我必须在 Stream1.A=Stream2.B 上加入 如果我没看错,map 函数应该返回带有 <String,String 的元组。>?
0赞 Matthias J. Sax 2/18/2016
不确定我是否理解正确。在您的示例中,您似乎只显示每个流的一个元组。如果我们假设 A 和 B 是您要加入的字段,则 map 会将 JSON 转换为 (ie, a )。与 stream2 类似,则转换为 .因此,您只需提取键字段值,并将键与完整的 JSON 对象一起发出。{"A":"a", "B":"b"}<"a", {"A":"a", "B":"b"}>Tuple2<String, JsonNode>{"B":"a","X":"x"}<"a", {"B":"a","X":"x"}>
0赞 Shekar Tippur 2/19/2016
不好意思。。花了一点时间才做出回应。谢谢你的解释。我的理解还差得很远。再次感谢。