使用 JDK8 和 lambda 压缩流 (java.util.stream.Streams.zip)

Zipping streams using JDK8 with lambda (java.util.stream.Streams.zip)

提问人:artella 提问时间:7/14/2013 最后编辑:Per Lundbergartella 更新时间:1/13/2023 访问量:103568

问:

在带有 lambda b93 的 JDK 8 中,b93 中有一个类 java.util.stream.Streams.zip,可用于压缩流(这在教程探索 Java8 Lambda 中进行了说明。第 1 部分,作者:Dhananjay Nene)。这个函数:

创建一个惰性和顺序组合的 Stream,其元素是 组合两个流的元素的结果。

然而,在 b98 中,这种情况已经消失了。事实上,在 b98 的 java.util.stream 中甚至无法访问该类。Streams

此功能是否已移动,如果是,如何使用 b98 简洁地压缩流?

我想到的应用程序是在 Shen 的这个 java 实现中,我替换了

  • static <T> boolean every(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
  • static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)

函数具有相当冗长的代码(不使用 b98 中的功能)。

lambda 函数式编程 java-8 惰性评估 java-stream

评论

6赞 artella 7/15/2013
啊刚刚发现它似乎已经完全删除了:mail.openjdk.java.net/pipermail/lambda-libs-spec-observers/......
0赞 Aleksei Egorov 12/20/2017
“探索 Java8 Lambda。第 1 部分“ - 本文的新链接已 blog.dhananjaynene.com/2013/02/exploring-java8-lambdas-part-1
0赞 Per Lundberg 11/11/2020
谢谢@AlekseiEgorov,现在也修复了帖子中的链接
0赞 Ti Strga 3/31/2023
在 2023 年阅读 @artella 链接中的帖子让我希望,随着现在的新支持,以及更远的值类型,也许我们会在不久的将来看到一些好的功能重新引入!Recordzip

答:

7赞 Nick Siderakis 10/23/2013 #1

Lazy-Seq 库提供 zip 功能。

https://github.com/nurkiewicz/LazySeq

该库深受其启发,旨在提供不可变的、线程安全的和易于使用的惰性序列实现,可能是无限的。scala.collection.immutable.Stream

13赞 Holger 11/15/2013 #2

您提到的类的方法已移至接口本身,转而使用默认方法。但似乎该方法已被删除。可能是因为不清楚不同大小的流的默认行为应该是什么。但是实现所需的行为是直接的:Streamzip

static <T> boolean every(
  Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().allMatch(x->!it.hasNext()||pred.test(x, it.next()));
}
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().filter(x->it.hasNext()&&pred.test(x, it.next()))
      .findFirst().orElse(null);
}

评论

0赞 Andreas 8/17/2015
传递给过滤器的 u 不是有状态的吗?这违反了方法协定,尤其是在并行处理流时不起作用。predicate
2赞 Holger 8/17/2015
@Andreas:此处的解决方案均不支持并行处理。由于我的方法不返回流,因此它们确保流不会并行运行。同样,接受的答案的代码返回一个流,该流可以转换为并行,但实际上不会并行执行任何操作。也就是说,不鼓励使用有状态谓词,但不违反契约。如果确保状态更新是线程安全的,则甚至可以在并行上下文中使用它们。在某些情况下,它们是不可避免的,例如,将流变成不同的流本身就是一个状态完整的谓词。
2赞 Holger 8/17/2015
@Andreas:你可能会猜到为什么这些操作已经从 Java API 中删除了......
85赞 siki 5/8/2014 #3

我也需要这个,所以我只是从 b93 中获取源代码并将其放在“util”类中。我不得不稍微修改它才能使用当前的 API。

作为参考,这里是工作代码(风险自负......

public static<A, B, C> Stream<C> zip(Stream<? extends A> a,
                                     Stream<? extends B> b,
                                     BiFunction<? super A, ? super B, ? extends C> zipper) {
    Objects.requireNonNull(zipper);
    Spliterator<? extends A> aSpliterator = Objects.requireNonNull(a).spliterator();
    Spliterator<? extends B> bSpliterator = Objects.requireNonNull(b).spliterator();

    // Zipping looses DISTINCT and SORTED characteristics
    int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics() &
            ~(Spliterator.DISTINCT | Spliterator.SORTED);

    long zipSize = ((characteristics & Spliterator.SIZED) != 0)
            ? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown())
            : -1;

    Iterator<A> aIterator = Spliterators.iterator(aSpliterator);
    Iterator<B> bIterator = Spliterators.iterator(bSpliterator);
    Iterator<C> cIterator = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return aIterator.hasNext() && bIterator.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(aIterator.next(), bIterator.next());
        }
    };

    Spliterator<C> split = Spliterators.spliterator(cIterator, zipSize, characteristics);
    return (a.isParallel() || b.isParallel())
           ? StreamSupport.stream(split, true)
           : StreamSupport.stream(split, false);
}

评论

1赞 Didier L 6/28/2015
如果其中一个流是 ,则生成的流不应该是 ,而不是两个?SIZEDSIZED
6赞 siki 6/29/2015
我不这么认为。必须同时使用这两个流才能使此实现正常工作。这实际上取决于您如何定义压缩。例如,您是否能够压缩两个不同大小的流?那么生成的流会是什么样子呢?我相信这就是为什么这个函数实际上被省略在 API 中的原因。有很多方法可以做到这一点,由用户决定什么行为应该是“正确”的。你会丢弃较长流中的元素还是填充较短的列表?如果是这样,有什么价值?SIZED
0赞 jub0bs 8/11/2016
除非我遗漏了什么,否则不需要任何投射(例如)。Spliterator<A>
0赞 starwarswii 7/8/2017
是否有托管 Java 8 b93 源代码的网站?我很难找到它。
47赞 Dominic Fox 9/4/2014 #4

zip 是 Protonpack 库提供的函数之一。

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");

List<String> zipped = StreamUtils.zip(streamA,
                                      streamB,
                                      (a, b) -> a + " is for " + b)
                                 .collect(Collectors.toList());

assertThat(zipped,
           contains("A is for Apple", "B is for Banana", "C is for Carrot"));

评论

1赞 tokland 4/11/2016
也可以在 StreamEx 中找到:amaembo.github.io/streamex/javadoc/one/util/streamex/...
3赞 robby_pelssers 8/7/2015 #5
public class Tuple<S,T> {
    private final S object1;
    private final T object2;

    public Tuple(S object1, T object2) {
        this.object1 = object1;
        this.object2 = object2;
    }

    public S getObject1() {
        return object1;
    }

    public T getObject2() {
        return object2;
    }
}


public class StreamUtils {

    private StreamUtils() {
    }

    public static <T> Stream<Tuple<Integer,T>> zipWithIndex(Stream<T> stream) {
        Stream<Integer> integerStream = IntStream.range(0, Integer.MAX_VALUE).boxed();
        Iterator<Integer> integerIterator = integerStream.iterator();
        return stream.map(x -> new Tuple<>(integerIterator.next(), x));
    }
}
28赞 Karol Król 9/2/2015 #6

使用带有 lambda 的 JDK8 压缩两个流 (gist)。

public static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper) {
    final Iterator<A> iteratorA = streamA.iterator();
    final Iterator<B> iteratorB = streamB.iterator();
    final Iterator<C> iteratorC = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return iteratorA.hasNext() && iteratorB.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(iteratorA.next(), iteratorB.next());
        }
    };
    final boolean parallel = streamA.isParallel() || streamB.isParallel();
    return iteratorToFiniteStream(iteratorC, parallel);
}

public static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel) {
    final Iterable<T> iterable = () -> iterator;
    return StreamSupport.stream(iterable.spliterator(), parallel);
}

评论

2赞 sffc 9/13/2015
不错的解决方案和(相对)紧凑!要求您将 和 放在文件的顶部。import java.util.function.*;import java.util.stream.*;
0赞 smac89 10/27/2017
请注意,这是对流的终端操作。这意味着对于无限流,此方法会崩溃
2赞 Miguel Gamboa 2/4/2019
这么多无用的包装器:这里和这里又是:.为什么不直接实现 a 而不是 an ?检查@Doradus答案 stackoverflow.com/a/46230233/1140754() -> iteratoriterable.spliterator()SpliteratorIterator
2赞 John McClean 2/25/2016 #7

AOL 的 cyclops-react(我对此做出了贡献)也提供了压缩功能,既通过扩展的 Stream 实现,也实现了 reactive-streams 接口 ReactiveSeq,以及通过 StreamUtils,它通过静态方法提供与标准 Java Streams 相同的功能。

 List<Tuple2<Integer,Integer>> list =  ReactiveSeq.of(1,2,3,4,5,6)
                                                  .zip(Stream.of(100,200,300,400));


  List<Tuple2<Integer,Integer>> list = StreamUtils.zip(Stream.of(1,2,3,4,5,6),
                                                  Stream.of(100,200,300,400));

它还提供了更通用的基于应用程序的压缩。例如

   ReactiveSeq.of("a","b","c")
              .ap3(this::concat)
              .ap(of("1","2","3"))
              .ap(of(".","?","!"))
              .toList();

   //List("a1.","b2?","c3!");

   private String concat(String a, String b, String c){
    return a+b+c;
   }

甚至能够将一个流中的每个项目与另一个流中的每个项目配对

   ReactiveSeq.of("a","b","c")
              .forEach2(str->Stream.of(str+"!","2"), a->b->a+"_"+b);

   //ReactiveSeq("a_a!","a_2","b_b!","b_2","c_c!","c2")
-1赞 Gnana 2/26/2016 #8

这太棒了。我必须将两个流压缩到一个 Map 中,其中一个流是键,另一个流是值

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");    
final Stream<Map.Entry<String, String>> s = StreamUtils.zip(streamA,
                    streamB,
                    (a, b) -> {
                        final Map.Entry<String, String> entry = new AbstractMap.SimpleEntry<String, String>(a, b);
                        return entry;
                    });

System.out.println(s.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));

输出: {A=苹果,B=香蕉,C=胡萝卜}

29赞 Rafael 3/14/2017 #9

由于我无法想象在索引集合(列表)以外的集合上使用任何压缩,并且我是简单的忠实粉丝,这将是我的解决方案:

<A,B,C>  Stream<C> zipped(List<A> lista, List<B> listb, BiFunction<A,B,C> zipper){
     int shortestLength = Math.min(lista.size(),listb.size());
     return IntStream.range(0,shortestLength).mapToObj( i -> {
          return zipper.apply(lista.get(i), listb.get(i));
     });        
}

评论

2赞 seanf 3/3/2019
我认为应该是.mapToObjectmapToObj
2赞 avmohan 2/6/2020
如果列表不是(例如在链表上),这将非常慢RandomAccess
0赞 Rafael 2/12/2020
绝对。但是大多数 Java 开发人员都很清楚 LinkedList 在索引访问操作方面的性能很差。
47赞 ZhekaKozlov 3/28/2017 #10

如果您的项目中有 Guava,则可以使用 Streams.zip 方法(已在 Guava 21 中添加):

返回一个流,其中每个元素都是将 streamA 和 streamB 中每个元素的相应元素传递给函数的结果。生成的流将仅与两个输入流中较短的流一样长;如果一个流较长,则其额外元素将被忽略。生成的流无法有效地拆分。这可能会损害并行性能。

 public class Streams {
     ...

     public static <A, B, R> Stream<R> zip(Stream<A> streamA,
             Stream<B> streamB, BiFunction<? super A, ? super B, R> function) {
         ...
     }
 }
12赞 Doradus 9/15/2017 #11

我谦虚地建议这种实现。生成的流被截断为两个输入流中较短的流。

public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner) {
    Spliterator<L> lefts = leftStream.spliterator();
    Spliterator<R> rights = rightStream.spliterator();
    return StreamSupport.stream(new AbstractSpliterator<T>(Long.min(lefts.estimateSize(), rights.estimateSize()), lefts.characteristics() & rights.characteristics()) {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            return lefts.tryAdvance(left->rights.tryAdvance(right->action.accept(combiner.apply(left, right))));
        }
    }, leftStream.isParallel() || rightStream.isParallel());
}

评论

1赞 Miguel Gamboa 2/4/2019
我喜欢你的提议。但我不完全同意最后.我认为它没有效果,因为默认情况下提供有限的并行性。所以我认为最终的结果会和通过一样。.., leftStream.isParallel() || rightStream.isParallel()AbstractSpliteratorfalse
0赞 Doradus 2/8/2019
@MiguelGamboa - 感谢您的评论。我不确定你说的“默认有限并行性”是什么意思——你有指向一些文档的链接吗?
0赞 caduceus 9/19/2023
是否可以提供 BiConsumer?
2赞 const.grigoryev 7/26/2018 #12

如果有人需要这个,streamex 库中有功能:StreamEx.zipWith

StreamEx<String> givenNames = StreamEx.of("Leo", "Fyodor")
StreamEx<String> familyNames = StreamEx.of("Tolstoy", "Dostoevsky")
StreamEx<String> fullNames = givenNames.zipWith(familyNames, (gn, fn) -> gn + " " + fn);

fullNames.forEach(System.out::println);  // prints: "Leo Tolstoy\nFyodor Dostoevsky\n"
11赞 Dan Borza 9/11/2018 #13

使用最新的番石榴库(用于课程),您应该能够做到Streams

final Map<String, String> result = 
    Streams.zip(
        collection1.stream(), 
        collection2.stream(), 
        AbstractMap.SimpleEntry::new)
    .collect(Collectors.toMap(e -> e.getKey(), e  -> e.getValue()));
4赞 dominic 9/7/2019 #14

这对你有用吗?这是一个简短的函数,它懒惰地评估它正在压缩的流,因此你可以为它提供无限的流(它不需要采用被压缩的流的大小)。

如果流是有限的,则一旦其中一个流的元素用完,它就会停止。

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Stream;

class StreamUtils {
    static <ARG1, ARG2, RESULT> Stream<RESULT> zip(
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner) {
        final var i2 = s2.iterator();
        return s1.map(x1 -> i2.hasNext() ? combiner.apply(x1, i2.next()) : null)
                .takeWhile(Objects::nonNull);
    }
}

这是一些单元测试代码(比代码本身长得多!

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;

class StreamUtilsTest {
    @ParameterizedTest
    @MethodSource("shouldZipTestCases")
    <ARG1, ARG2, RESULT>
    void shouldZip(
            String testName,
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner,
            Stream<RESULT> expected) {
        var actual = StreamUtils.zip(s1, s2, combiner);

        assertEquals(
                expected.collect(Collectors.toList()),
                actual.collect(Collectors.toList()),
                testName);
    }

    private static Stream<Arguments> shouldZipTestCases() {
        return Stream.of(
                Arguments.of(
                        "Two empty streams",
                        Stream.empty(),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One singleton and one empty stream",
                        Stream.of(1),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One empty and one singleton stream",
                        Stream.empty(),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "Two singleton streams",
                        Stream.of("blah"),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blah", 1))),
                Arguments.of(
                        "One singleton, one multiple stream",
                        Stream.of("blob"),
                        Stream.of(2, 3),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blob", 2))),
                Arguments.of(
                        "One multiple, one singleton stream",
                        Stream.of("foo", "bar"),
                        Stream.of(4),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("foo", 4))),
                Arguments.of(
                        "Two multiple streams",
                        Stream.of("nine", "eleven"),
                        Stream.of(10, 12),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("nine", 10), pair("eleven", 12)))
        );
    }

    private static List<Object> pair(Object o1, Object o2) {
        return List.of(o1, o2);
    }

    static private <T1, T2> List<Object> combine(T1 o1, T2 o2) {
        return List.of(o1, o2);
    }

    @Test
    void shouldLazilyEvaluateInZip() {
        final var a = new AtomicInteger();
        final var b = new AtomicInteger();
        final var zipped = StreamUtils.zip(
                Stream.generate(a::incrementAndGet),
                Stream.generate(b::decrementAndGet),
                (xa, xb) -> xb + 3 * xa);

        assertEquals(0, a.get(), "Should not have evaluated a at start");
        assertEquals(0, b.get(), "Should not have evaluated b at start");

        final var takeTwo = zipped.limit(2);

        assertEquals(0, a.get(), "Should not have evaluated a at take");
        assertEquals(0, b.get(), "Should not have evaluated b at take");

        final var list = takeTwo.collect(Collectors.toList());

        assertEquals(2, a.get(), "Should have evaluated a after collect");
        assertEquals(-2, b.get(), "Should have evaluated b after collect");
        assertEquals(List.of(2, 4), list);
    }
}

评论

0赞 simbo1905 10/30/2019
我不得不在最后删除它,这似乎不在 java8 中,但这不是问题,因为被调用者可以过滤掉压缩流大小不同时发生的任何空值。我认为这个答案应该是第一答案,因为它是可以理解的。干得好,再次感谢。takeWhile