KStream 跨多个流进行重复数据删除,同时保留同一流中的重复项

KStream deduplicate across multiple streams while keeping duplicates from within the same stream

提问人:simonalexander2005 提问时间:6/7/2023 最后编辑:simonalexander2005 更新时间:6/15/2023 访问量:119

问:

我有两个 Kafka 流。

例如,第一个 - 没有键,但我们有一个(非唯一,考虑到消息速率)时间戳。1,3,5,7,9

第二个,例如: - 即某物为第一个主题中的值分配了一个键,并且(可逆地)更改了该值。时间戳不一定与第一个主题的时间戳匹配。a:1augmented, c:3augmented, e:5augmented, g:7augmented, i:9augmented

但是,第二个流可能具有“重复项”(当值更改回以匹配第一个主题时),但具有不同的键 - 例如a:1aug, b:1augalt, c:3aug, ...

第二个流也可能完全缺少第一个流的值,并且与第一个流相比,第二个流中的消息也可能不按顺序出现。

这给我们留下了以下示例:

分流 1: 分流 2:1,3,5,7,9a:1aug1, b:1aug2, i:9aug, g:7aug

我想将其简化为第三个主题,如下所示:

  1. 如果消息仅存在于第一个流中,请保留它
  2. 如果消息存在于两个流中,请保留第二个流中的 key:value
  3. 如果消息仅存在于第二个流中,请保留它
  4. 如果消息在第二个流中多次存在,请保留第二个流中的消息的所有副本,而第一个流中的任何消息副本。

此外,我希望排序顺序与第一个流的排序顺序匹配。

使用代码对前三个条件效果很好,但是我怎样才能让第四个条件起作用,因为它不再是成对比较?.reduce

我目前的代码:

// pull in the two input streams (raw and augmented)
KStream<String, String> rawInputStream = builder.stream(rawTopic, Consumed.with(Serdes.String(), Serdes.String()));
KStream<String, String> augmentedInputStream = builder.stream(augTopic, Consumed.with(Serdes.String(), Serdes.String()));

// map to a common key, so we can easily compare the messages. Store the original keys in the value also, so we can reuse them later.
// The raw input won't have any original key, so use a blank string.

KStream<String, CustomMessageDetailsWithKeyAndOrigin> mappedRawInputStream = rawInputStream
        .map((key, value) -> KeyValue.pair(getCommonKeyFromRawInputStream(value)
                , new CustomMessageDetailsWithKeyAndOrigin(getValueFromRawInputStream(value),key == null ? "" : key, OriginStream.RAW)));

KStream<String, CustomMessageDetailsWithKeyAndOrigin> mappedAugmentedInputStream = augmentedInputStream
        .map((key, value) -> KeyValue.pair(getCommonKeyFromAugmentedInputStream(value)
                , new CustomMessageDetailsWithKeyAndOrigin(value, key == null ? "" : key, OriginStream.AUGMENTED)));

// the outer join here will do a pairwise comparison across all records with a matching key, and just keep the records from the aggregated feed unless no agg value exists.
KStream<String, CustomMessageDetailsWithKeyAndOrigin> mergedStream 
            = mappedRawInputStream.outerJoin(mappedAugmentedInputStream, (value1,value2)-> {
                if (value2 == null) { // no augmented message
                    // log
                    return value1; }
                else if(value1 == null) {} // no raw message - log.
                return value2;  
    }
    // Add a time-based join window to allow for time differences and sequence issues
    ,JoinWindows.ofTimeDifferenceAndGrace(window, windowGrace));
    
// We'll potentially have duplicates now - e.g. one from each input stream, or two from one?; so group by key to bring together the records that share a key
KGroupedStream<String, CustomMessageDetailsWithKeyAndOrigin> groupedStream = mergedStream.groupByKey();

// ungroup the records again, reducing to remove duplicates. 
KStream<String, CustomMessageDetailsWithKeyAndOrigin> reducedStream
    = groupedStream.aggregate(LinkedHashSet<CustomMessageDetailsWithKeyAndOrigin>::new, (key, value, aggregate) ->  {
        if (value != null) {
            boolean added = aggregate.add(value); // won't add again if it's a duplicate
            if (!added){}
                // duplicate - log it.
        }
        return aggregate;
    }).toStream().flatMapValues(value->value);

// grab the original key from the key-value pair stored in the value field to use as the final key, and grab the value from the key-value pair to use as the final value
reducedStream.selectKey((key, value)->value.getOriginalKey())
    .mapValues((value)->value.getRawValue())
    .to(outputTopicName, Produced.with(Serdes.String(), Serdes.String()));
Java 复制 序列 apache-kafka-streams

评论


答:

0赞 simonalexander2005 6/15/2023 #1

我使用了一个 .merge() 和一个自定义处理器来做到这一点,它获取合并的流并根据需要处理它们。