Java 的 Stream.flatMap() 的(某种)逆运算是什么?

What is the (kind of) inverse operation to Java's Stream.flatMap()?

提问人:Harald 提问时间:6/15/2018 更新时间:8/12/2022 访问量:7866

问:

该操作将转换Stream.flatMap()

a, b, c

添加到每个输入元素包含零个或多个元素的流中,例如

a1, a2, c1, c2, c3

是否有相反的操作将几个元素批量化为一个新元素?

  • 它不是 .reduce(),因为这只产生一个结果
  • 它不是collect(),因为它只填充一个容器(afaiu)
  • 它不是forEach(),因为它的返回结果只是并且有副作用void

它存在吗?我可以以任何方式模拟它吗?

java-stream

评论

0赞 SLaks 6/15/2018
您在寻找什么退货类型?
0赞 Harald 6/15/2018
Stream<X> 进入,Stream<Y> 出来,其中 Y 是 X 的某种组合。原则上,整个事情与 collect() 非常相似,只是它真的会保持“流式处理”,而不是先收集然后流式传输结果:正如 @Lino 的答案。
1赞 SLaks 6/15/2018
你想要。Collectors.groupingBy

答:

1赞 Lino 6/15/2018 #1

你可以破解你的方式。请参阅以下示例:

Stream<List<String>> stream = Stream.of("Cat", "Dog", "Whale", "Mouse")
   .collect(Collectors.collectingAndThen(
       Collectors.partitioningBy(a -> a.length() > 3),
       map -> Stream.of(map.get(true), map.get(false))
    ));

评论

0赞 Harald 6/15/2018
:-)确实是黑客攻击。
0赞 Tomasz Linkowski 6/15/2018 #2

这是我想出的:

interface OptionalBinaryOperator<T> extends BiFunction<T, T, Optional<T>> {
  static <T> OptionalBinaryOperator<T> of(BinaryOperator<T> binaryOperator,
          BiPredicate<T, T> biPredicate) {
    return (t1, t2) -> biPredicate.test(t1, t2)
            ? Optional.of(binaryOperator.apply(t1, t2))
            : Optional.empty();
  }
}

class StreamUtils {
  public static <T> Stream<T> reducePartially(Stream<T> stream,
          OptionalBinaryOperator<T> conditionalAccumulator) {
    Stream.Builder<T> builder = Stream.builder();
    stream.reduce((t1, t2) -> conditionalAccumulator.apply(t1, t2).orElseGet(() -> {
      builder.add(t1);
      return t2;
    })).ifPresent(builder::add);
    return builder.build();
  }
}

不幸的是,我没有时间让它变得懒惰,但可以通过编写一个自定义委托来完成,该委托将遵循上述逻辑(而不是利用 ,这是一个终端操作)。Spliteratorstream.spliterator()stream.reduce()


我刚刚意识到你想要皈依,我写了关于皈依的文章。如果可以先映射 from to ,然后使用上面的函数,那么就是这样(即使它不是最优的)。<T,U><T,T>TU

如果它更复杂,则需要在提出 API 之前定义减少/合并的条件类型(例如 , , , 甚至 )。Predicate<T>BiPredicate<T,T>BiPredicate<U,T>Predicate<List<T>>

1赞 Joop Eggen 6/15/2018 #3
    IntStream.range(0, 10)
            .mapToObj(n -> IntStream.of(n, n / 2, n / 3))
            .reduce(IntStream.empty(), IntStream::concat)
            .forEach(System.out::println);

如您所见,元素也被映射到 Streams,然后连接成一个大流。

评论

0赞 Harald 6/19/2018
我可能误解了这一点或无法进行转移,但这似乎假设当我看到 a2 时,我可以生成 a3、a1 和 a1。但事实并非如此。一些任意的元素出现了,我想把它们批处理起来。
0赞 Joop Eggen 6/19/2018
@Harald我理解:编辑多个流元素的子范围,更改流。就像管道 I/O 中的数据过滤器一样 / ... .我认为 Stream 不是好东西。
2赞 user_3380739 6/20/2018 #4

StreamEx 中查看collapse

StreamEx.of("a1", "a2", "c1", "c2", "c3").collapse((a, b) -> a.charAt(0) == b.charAt(0))
    .map(e -> e.substring(0, 1)).forEach(System.out::println);

或者我的叉子有更多功能:、、......groupBysplitsliding

StreamEx.of("a1", "a2", "c1", "c2", "c3").collapse((a, b) -> a.charAt(0) == b.charAt(0))
.map(e -> e.substring(0, 1)).forEach(System.out::println);
// a
// c

StreamEx.of("a1", "a2", "c1", "c2", "c3").splitToList(2).forEach(System.out::println);
// [a1, a2]
// [c1, c2]
// [c3]

StreamEx.of("a1", "a2", "c1", "c2", "c3").groupBy(e -> e.charAt(0))
.forEach(System.out::println);
// a=[a1, a2]
// c=[c1, c2, c3]

评论

0赞 TheJeff 8/12/2022
StreamEx groupBy 方法是否避免在对内存中的所有元素进行分组之前收集它们?
1赞 user_3380739 8/12/2022
确实如此。可以通过在折叠前添加 peek 来验证它:stream.peek(System.out::p rintln).collapse(...)
0赞 TheJeff 8/12/2022
这有很大帮助,基于 SteamEx 为此添加了另一个答案。感谢花时间添加这个,这里的最佳答案是 imo - 内存高效、维护和可重用。
7赞 Harald 6/26/2018 #5

最后,我发现这是它自己的“逆向”,可以这么说。我监督说,这并不一定会增加元素的数量。它还可以通过为某些元素发出空流来减少元素的数量。为了实现 group-by 操作,调用 by 的函数需要最少的内部状态,即最新的元素。它要么返回一个空流,要么在组结束时返回简化为组的代表。flatMapflatMapflatMap

这是一个快速实现,如果传入的两个元素不属于同一个组,即它们之间是组边界,则必须返回。是将 (1,a)、(1,a)、(1,a) 组合成 (3,a) 的组函数,假设您的组元素是元组 (int, string)。groupBordertruecombiner

public class GroupBy<X> implements Function<X, Stream<X>>{

  private final BiPredicate<X, X> groupBorder;
  private final BinaryOperator<X> combiner;
  private X latest = null;

  public GroupBy(BiPredicate <X, X> groupBorder,
                 BinaryOperator<X> combiner) {
    this.groupBorder = groupBorder;
    this.combiner = combiner;
  }

  @Override
  public Stream<X> apply(X elem) {
    // TODO: add test on end marker as additonal parameter for constructor
    if (elem==null) {
      return latest==null ? Stream.empty() : Stream.of(latest);
    }
    if (latest==null) {
      latest = elem;
      return Stream.empty();
    }
    if (groupBorder.test(latest, elem)) {
      Stream<X> result = Stream.of(latest);
      latest = elem;
      return result;
    }
    latest = combiner.apply(latest,  elem);
    return Stream.empty();
  }
}

但有一点需要注意:要传送整个流的最后一组,必须将结束标记作为最后一个元素卡在流中。上面的代码假设它是 ,但可以添加一个额外的 end-marker-tester。null

我无法想出一个不依赖于结束标记的解决方案。

此外,我也没有在传入和传出元素之间进行转换。对于唯一操作,这将起作用。对于计数操作,上一步必须将单个元素映射到计数对象。

评论

0赞 armani 4/4/2020
此功能的实际用途是什么?
0赞 Joshua Goldberg 3/2/2022 #6

有点像 StreamEx,您可以手动实现 Spliterator。例如

collectByTwos(Stream.of(1, 2, 3, 4), (x, y) -> String.format("%d%d", x, y))

...使用以下代码返回“12”、“34”流:

public static <X,Y> Stream<Y> collectByTwos(Stream<X> inStream, BiFunction<X,X,Y> mapping) {
    Spliterator<X> origSpliterator = inStream.spliterator();
    Iterator<X> origIterator = Spliterators.iterator(origSpliterator);

    boolean isParallel = inStream.isParallel();
    long newSizeEst = (origSpliterator.estimateSize() + 1) / 2;

    Spliterators.AbstractSpliterator<Y> lCombinedSpliterator =
            new Spliterators.AbstractSpliterator<>(newSizeEst, origSpliterator.characteristics()) {
        @Override
        public boolean tryAdvance(Consumer<? super Y> action) {
            if (! origIterator.hasNext()) {
                return false;
            }
            X lNext1 = origIterator.next();
            if (! origIterator.hasNext()) {
                throw new IllegalArgumentException("Trailing elements of the stream would be ignored.");
            }
            X lNext2 = origIterator.next();
            action.accept(mapping.apply(lNext1, lNext2));
            return true;
        }
    };
    return StreamSupport.stream(lCombinedSpliterator, isParallel)
            .onClose(inStream::close);
}

(我认为这对于并行流可能是不正确的。

1赞 TheJeff 8/12/2022 #7

主要得益于 user_3380739 上面的 StreamEx 答案,您可以在此处使用文档groupRuns

StreamEx.of("a1", "a2", "c1", "c2", "c3").groupRuns( t, u -> t.charAt(0) == u.charAt(0) )
.forEach(System.out::println);

// a=[a1, a2]
// c=[c1, c2, c3]

评论

0赞 basin 6/17/2023
groupRuns在第一个 null 元素上停止。如果您需要 null,可能是更好的选择。google.github.io/mugcom.google.mu.util.stream.MoreStreams.groupConsecutive