How to Split a Stream into Multiple Streams – 如何将一个流分割成多个流

最后修改: 2022年 6月 30日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Java’s Streams API is a powerful and versatile tool for processing data. By definition, a streaming operation is a single iteration through a set of data.

Java的流API是处理数据的一个强大而通用的工具。根据定义,流操作是对一组数据的一次迭代。

However, sometimes we want to process parts of the stream differently and get more than one set of results.

然而,有时我们想以不同的方式处理部分数据流,并获得一组以上的结果。

In this tutorial, we’ll learn how to split a stream into multiple groups and process them independently.

在本教程中,我们将学习如何将一个流分成多个组,并独立处理它们。

2. Using Collectors

2.使用收集器

Stream should be operated on once and have one terminal operation. It can have multiple intermediate operations, but the data can only be collected once before it closes.

一个应该被操作一次,并且有一个终端操作。它可以有多个中间操作,但在它关闭之前只能收集一次数据。

This means that the Streams API specification explicitly forbids forking the stream and having different intermediate operations for each fork. This would lead to multiple terminal operations. However, we can split the stream inside the terminal operation. This creates a result divided into two or more groups.

这意味着Streams API规范明确禁止对流进行分叉,并对每个分叉进行不同的中间操作。这将导致多个终端操作。然而,我们可以在终端操作中分割流。这将创建一个分为两个或多个组的结果。

2.1. Binary Split with partitioningBy

2.1.用partitioningBy进行二进制分割

If we want to split a stream in two, we can use partitioningBy from the Collectors class. It takes a Predicate and returns a Map that groups elements that satisfied the predicate under the Boolean true key and the rest under false.

如果我们想将一个流一分为二,我们可以使用partitioningBy,来自Collectors类。它接收一个Predicate,并返回一个Map,该Map将满足该predicate的元素分组在Boolean true键下,其余的在false下。

Let’s say we have a list of articles that contains information about the target sites they should be posted on and if they should be featured.

比方说,我们有一个文章清单,其中包含了它们应该被张贴在哪些目标网站上,以及它们是否应该被收录的信息。

List<Article> articles = Lists.newArrayList(
  new Article("Baeldung", true),
  new Article("Baeldung", false),
  new Article("Programming Daily", false),
  new Article("The Code", false));

We’ll divide it into two groups, one containing only Baeldung articles and the second one containing the rest:

我们将把它分成两组,一组只包含贝尔东的文章,第二组包含其余的文章。

Map<Boolean, List<Article>> groupedArticles = articles.stream()
  .collect(Collectors.partitioningBy(a -> a.target.equals("Baeldung")));

Let’s see which articles are filed under the true and false keys in the map:

让我们看看哪些文章被归入地图中的truefalse键。

assertThat(groupedArticles.get(true)).containsExactly(
  new Article("Baeldung", true),
  new Article("Baeldung", false));
assertThat(groupedArticles.get(false)).containsExactly(
  new Article("Programming Daily", false),
  new Article("The Code", false));

2.2. Splitting with groupingBy

2.2.用groupingBy拆分

If we want to have more categories, then we need to use the groupingBy method. It takes a function that classifies each element into a group. Then it returns a Map that links each group classifier to a collection of its elements.

如果我们想有更多的分类,那么我们需要使用groupingBy方法。它接受一个函数,将每个元素分类为一个组。然后它返回一个Map,将每个组的分类器链接到其元素的集合。

Let’s say we want to group articles by target site. The returned Map will have keys containing names of the sites and values containing collections of the articles associated with the given site:

比方说,我们想按目标网站对文章进行分组。返回的Map将有包含网站名称的键和包含与给定网站相关的文章集合的值。

Map<String, List<Article>> groupedArticles = articles.stream()
  .collect(Collectors.groupingBy(a -> a.target));
assertThat(groupedArticles.get("Baeldung")).containsExactly(
  new Article("Baeldung", true),
  new Article("Baeldung", false));
assertThat(groupedArticles.get("Programming Daily")).containsExactly(new Article("Programming Daily", false));
assertThat(groupedArticles.get("The Code")).containsExactly(new Article("The Code", false));

3. Using teeing

3.使用teeing

Since Java 12, we have another option for the binary split. We can use the teeing collector. teeing combines two collectors into one composite. Every element is processed by both of them and then merged into a single return value using the provided merger function.

从Java 12开始,我们有了另一种二进制分割的选择。我们可以使用teeing收集器。teeing将两个收集器合并成一个复合体。每个元素都由它们两个处理,然后使用提供的合并函数合并成一个单一的返回值。

3.1. teeing with a Predicate

3.1.使用谓词teeing

The teeing collector pairs nicely with another collector from the Collectors class called filtering. It takes a predicate and uses it to filter processed elements and then passes them to yet another collector.

teeing收集器与另一个来自Collectors类的收集器很好地配对,称为filtering它接受一个谓词,并使用它来过滤处理的元素,然后将它们传递给另一个收集器。

Let’s divide articles into groups of Baeldung and non-Baeldung ones and count them. We’ll also use the List constructor as a merger function:

让我们把文章分为Baeldung和非Baeldung的组,并对它们进行统计。我们还将使用List构造函数作为合并函数。

List<Long> countedArticles = articles.stream().collect(Collectors.teeing(
  Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.counting()),
  Collectors.filtering(article -> !article.target.equals("Baeldung"), Collectors.counting()),
  List::of));
assertThat(countedArticles.get(0)).isEqualTo(2);
assertThat(countedArticles.get(1)).isEqualTo(2);

3.2. teeing with Overlapping Results

3.2.结果重叠的teeing

There is one important difference between this solution and the previous ones. The groups we created earlier had no overlap, each element from the source stream belonged to at most one group. With teeing, we are no longer bound by this limitation because each collector potentially processes the whole stream. Let’s look at how we can take advantage of it.

这个方案与之前的方案有一个重要的区别。我们之前创建的组没有重叠,源流中的每个元素最多属于一个组。有了teeing,我们不再受此限制,因为每个收集器都有可能处理整个流。让我们来看看我们如何利用它。

We may want to process articles into two groups, one with featured articles only and the second one with Baeldung articles only. The resulting sets of articles may overlap as an article can be at the same time featured and targeted at Baeldung.

我们可能想把文章分成两组,一组只有特色文章,另一组只有Baeldung文章。由此产生的文章组可能会有重叠,因为一篇文章可能同时是有特色的和针对Baeldung的。

This time instead of counting, we’ll collect them into lists:

这一次,我们不再计算,而是将它们收集到列表中。

List<List<Article>> groupedArticles = articles.stream().collect(Collectors.teeing(
  Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.toList()),
  Collectors.filtering(article -> article.featured, Collectors.toList()),
  List::of));

assertThat(groupedArticles.get(0)).hasSize(2);
assertThat(groupedArticles.get(1)).hasSize(1);

assertThat(groupedArticles.get(0)).containsExactly(
  new Article("Baeldung", true),
  new Article("Baeldung", false));
assertThat(groupedArticles.get(1)).containsExactly(new Article("Baeldung", true));

4. Using RxJava

4.使用RxJava

While Java’s Streams API is a useful tool, sometimes it’s not enough. Other solutions, like reactive streams provided by RxJava, may be able to help us. Let’s look at a short example of how we can use an Observable and multiple Subscribers to achieve the same results as our Stream examples.

虽然 Java 的 Streams API 是一个有用的工具,但有时它是不够的。其他解决方案,如RxJava提供的反应式流,或许能够帮助我们。让我们看一个简短的例子,看看我们如何使用一个Observable和多个Subscribers来实现与我们的Stream例子相同的结果。

4.1. Creating an Observable

4.1.创建一个Observable

First, we need to create an Observable instance from our list of articles. We can use the Observable class’s from factory method:

首先,我们需要从我们的文章列表中创建一个Observable实例。我们可以使用Observable类的from工厂方法。

Observable<Article> observableArticles = Observable.from(articles);

4.2. Filtering Observables

4.2.筛选观察物

Next, we need to create Observables that will filter articles. To do that, we’ll use the filter method from the Observable class:

接下来,我们需要创建Observables来过滤文章。要做到这一点,我们将使用Observable类中的filter方法。

Observable<Article> baeldungObservable = observableArticles.filter(
  article -> article.target.equals("Baeldung"));
Observable<Article> featuredObservable = observableArticles.filter(
  article -> article.featured);

4.3. Creating Multiple Subscribers

4.3.创建多个订阅者

Finally, we need to subscribe to the Observables and provide an Action that will describe what we want to do with the articles. A real-world example would be saving them in the database or sending them to the client, but we’ll settle for adding them to the list:

最后,我们需要订阅Observables,并提供一个Action,描述我们想对文章做什么。现实世界的例子是将它们保存在数据库中或将它们发送到客户端,但我们将解决将它们添加到列表中。

List<Article> baeldungArticles = new ArrayList<>();
List<Article> featuredArticles = new ArrayList<>();
baeldungObservable.subscribe(baeldungArticles::add);
featuredObservable.subscribe(featuredArticles::add);

5. Conclusion

5.总结

In this tutorial, we learned how to split streams into groups and process them separately. First, we looked at the older Streams API methods: groupingBy and partitionBy. Next, we used a newer approach utilizing the teeing method introduced in Java 12. Finally, we looked at how we can use RxJava to achieve similar results with greater elasticity.

在本教程中,我们学习了如何将流分成组并分别处理。首先,我们看了老的Streams API方法。groupingBypartitionBy。接下来,我们利用Java 12中引入的teeing方法,使用了一种较新的方法。最后,我们研究了如何利用RxJava以更大的弹性来实现类似的结果。

As always, the source code is available over on GitHub.

一如既往,源代码可在GitHub上获取。