Flink flatMap() - NullPointerException

Flink flatMap() - NullPointerException

提问人:overexchange 提问时间:9/2/2023 最后编辑:overexchange 更新时间:9/3/2023 访问量:52

问:

代码如下:

public class VerifyDuplicate {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<Tuple3<String, String, Integer>> dataStream = env.fromElements(
            Tuple3.of("ID_1", "subid_1", 1),
            Tuple3.of("ID_2", "subid_2", 2),
            Tuple3.of("ID_3", "subid_3", 3),
            Tuple3.of("ID_4", "subid_4", 4),
            Tuple3.of("ID_4", "subid_4", 4),
            Tuple3.of("ID_6", "subid_6", 6),
            Tuple3.of("ID_4", "subid_7", 7),
            Tuple3.of("ID_8", "subid_8", 8),
            Tuple3.of("ID_9", "subid_9", 9),
            Tuple3.of("ID_10", "subid_10", 10)
            );

        KeyedStream<Tuple3<String, String, Integer>, String> partitionedStream = dataStream.keyBy(new KeySelector<Tuple3<String, String,Integer>, String>() {
                @Override
                public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                    return value.f0; // partition on f0
                }
            });


        partitionedStream.keyBy(new KeySelector<Tuple3<String, String,Integer>, String>() {
                @Override
                public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                    return value.f1; // subid
                }
            }).flatMap(new FilterDuplicate()).print();

        env.execute("Test");
}    
}

public  class FilterDuplicate extends RichFlatMapFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>> {

    private ValueState<Boolean> seen;

    @Override
    public void open(Configuration configuration) {
        StateTtlConfig ttlConfig = StateTtlConfig
          .newBuilder(Time.seconds(15))
          .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
          .cleanupFullSnapshot()
          .build();
        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
        desc.enableTimeToLive(ttlConfig);
        seen = getRuntimeContext().getState(desc);
    }

    @Override
    public void flatMap(Tuple3<String, String, Integer> value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
        
        if (!seen.value()) { // nullpointerexception
            // we haven't seen the element yet
            out.collect(value);
            // set operator state to true so that we don't emit elements with this key again
            seen.update(true);
        }
    }
}

  <flink.version>1.17.1</flink.version>
  <target.java.version>11</target.java.version>

NullPointerExceptionAT IN 方法!seen.value()flatMap()

为什么要给 NullPointerException?在问题行之前检查了条件,但仍然失败...seen.value()seen==null

爪哇岛 nullpointerexception apache-flink flink-streaming

评论


答:

1赞 Pierre CHARLES 9/2/2023 #1

小心,否定(!)也会导致问题。 在尝试进行否定之前,您需要检查两者。seen != null && seen.value() != null

看起来您的 seen 属性未正确启动,我不理解您的所有代码,但我可能会默认将其设置为 false。

评论

0赞 overexchange 9/2/2023
这里提到:如果没有以前的状态,那么这将是 null。确保处理 null 大小写。我通过在初始化后检查来验证它,它显示 null。但是不确定,如何设置默认值?desc.getDefaultValue()
0赞 overexchange 9/2/2023
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN, false);已弃用
0赞 overexchange 9/2/2023
调试后,我看到,是和seen.value()nullseen != null
0赞 Pierre CHARLES 9/2/2023
@overexchange您可以尝试以下脚本,但我没有对其进行测试,也不知道这是否是您期望的行为:@Override public void flatMap(Tuple3<String, String, Integer> value, Collector<Tuple3<String, String, Integer>> out) throws Exception { Boolean seenValue = seen.value(); // get the value of seen if (seenValue == null || !seenValue) { // check if it is null or false // we haven't seen the element yet out.collect(value); // set operator state to true so that we don't emit elements with this key again seen.update(true); } }
1赞 kkrugler 9/3/2023 #2
  1. 对于 ,只需执行以下操作:NullPointerException
    if (seen.value() == null) {
        out.collect(value);
        seen.update(true);
    }
  1. 连续有两个操作。当您执行第二个分区时,第一个分区将丢失。如果要按 id 和 sub-id 进行分区,则应该有一个 single,它返回由元组中的这两个字段组成的键。keyBy()keyBy()

评论

0赞 overexchange 9/4/2023
对于第 2 点,在您的回答中,您的意思是吗?首先应该有....keyBy()Tuple2.of(value.f0, value.f1)
0赞 overexchange 9/4/2023
相关问题 - stackoverflow.com/q/77037956/3317808
0赞 kkrugler 9/5/2023
我通常在生成复合键时格式化字符串,例如.keyBy(t -> String.format("%s|%s", t.f0, t.f1))