sorted() 和 concat() 的奇怪流行为

Weird streams behavior with sorted() and concat()

提问人:aviad cohen 提问时间:1/24/2023 最后编辑:Didier Laviad cohen 更新时间:1/25/2023 访问量:84

问:

流评估通常是延迟的(默认情况下),除非有状态操作作为管道的一部分存在。我遇到过一个由于有状态操作而违反懒惰的情况,我不明白为什么会这样。

请考虑以下代码:

List<Integer> l1 = List.of(4, 5, 3, 1, 2);
List<Integer> l2 = List.of(6, 7, 8, 9, 10);

Stream
    .concat(
        l1.stream()
            .map(i -> {
                System.out.println("in the map for " + i);
                if (i % 3 != 0) {
                    return null;
                }
                return i;
            }),
        l2.stream())
    .filter(i -> {
        System.out.println("in the filter " + i);
        return i != null;
    })
    .findAny();

具体内容:

我有两个由整数列表(&)构造的蒸汽。两个流连接起来形成一个新流。l1l2

流经过一些映射,将每个不能被 3 整除的项目转换为 ;流按原样进行。在串联的流上,我添加了一个过滤器(仅过滤非空值 -->因此,从第一个流开始,只有除以 3 的项目才会通过管道),最后是一个终端操作,该操作触发流的管道(并将有效地传递回第一个可被 3 整除的项目并停止流处理)。l1nulll2findAny

此代码按预期工作:首先使用所有项目,然后到达项目。输出显示了如何调用映射函数,然后对前两项调用 concatenated-stream-filter 函数,当第 3 项未转换为 null 时,整个流完成,因此在筛选器中仍然存在:l1l2l1l1l1

in the map for 4
in the filter null
in the map for 5
in the filter null
in the map for 3
in the filter 3

问题(或我不明白的事情)在使用操作修改流时开始:l1.sorted

Stream
    .concat(
        l1.stream()
            .sorted()
            .map(i -> {
                System.out.println("in the map for " + i);
                if (i % 3 != 0) {
                    return null;
                }
                return i;
            }),
        l2.stream())
    .filter(i -> {
        System.out.println("in the filter " + i);
        return i != null;
    })
    .findAny();

...现在情况看起来不同了:

in the map for 1
in the map for 2
in the map for 3
in the map for 4
in the map for 5
in the filter null
in the filter null
in the filter 3

由于sorted是有状态操作,我知道它首先需要消耗整个流来对其值进行排序。令我惊讶的是,它似乎也影响了 l1 管道的其余部分,因为该函数在任何连接流过滤器方法调用之前都会像以前一样急切地调用。l1map

我读了 Java Streams - 缓冲巨大的流 和 为什么 flatMap() 之后的 filter() 在 Java 流中“不完全”懒惰?,我已经在 java 17 上运行并使用并且我没有使用(至少不是显式的)。Stream.concat()flatMap()

你能解释一下为什么吗?我在这里错过了什么?

java-stream 惰性评估 短路

评论


答:

3赞 Didier L 1/25/2023 #1

这是由 JDK-8277306 引起的:带有 sorted() 和 concat() 的流会导致某些操作不延迟,该操作因“不会修复”而关闭,并带有以下注释:

Stream.concat 从每个输入流中获取拆分器,并将它们组合在一起以创建一个新的拆分器,从中构造一个新流。因此,它绑定到每个流的源以连接。

目前无法将串联后的流管道的短路特性传播到串联前的管道。归根结底,就是要解决分路器边界上的推拉差异。这是一个棘手的问题,我可能需要付出巨大的努力,考虑到问题的范围,我发现很难证明这一点。

评论

1赞 aviad cohen 1/26/2023
感谢您引用未解决的错误。这当然是我遇到的问题。(感谢您编辑我原来的问题,现在看起来好多了......