Partition a Stream in Java – 用 Java 分割数据流

最后修改: 2023年 10月 8日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll explore various techniques for partitioning a Java 8 Stream based on a fixed maximum size.

在本教程中,我们将探讨基于固定最大尺寸对 Java 8 Stream 进行分区的各种技术。

We’ll start by revisiting how to accomplish this with Lists. Subsequently, we’ll enhance our approach by incorporating Stream-specific functionalities, such as lazy evaluation and thread safety.

首先,我们将重温如何使用 Lists 来实现这一目标。随后,我们将通过整合 Stream 特有的功能(如懒惰评估和线程安全)来增强我们的方法。

2. Partitioning a List

2.划分 列表</em

There are various ways of partitioning a List in Java. One easy way of doing it would be to start by determining the desired number of batches based on the desired batch size and the size of the source list:

在 Java 中,有多种分割 列表的方法。一种简便的方法是,首先根据所需的批次大小和源列表的大小确定所需的批次数量: List 然后,根据所需的批次大小和源列表的大小确定所需的批次数量。

static <T> Stream<List<T>> partitionList(List<T> source, int batchSize) {
    int nrOfFullBatches = (source.size() - 1) / batchSize;
    // ...
}

To partition the source list into smaller sub-lists, our initial step involves computing the indices that demarcate the starting and ending points of each batch. While performing this calculation, we should keep in mind that the last batch may have a smaller size compared to the others:

为了将源列表分割成更小的子列表,我们的第一步是计算划分每个批次的起点和终点的索引。在进行计算时,我们应注意最后一批的大小可能小于其他批次:

int startIndex = batch * batchSize;
int endIndex = (batch == nrOfFullBatches) ? source.size() : (batch + 1) * batchSize;

Finally, we can add some validations and cover all the corner case scenarios. For example, when the source list is empty or if batchSize is a negative number:

最后,我们还可以添加一些验证,以涵盖所有可能出现的情况。例如,当源列表为空或 batchSize 为负数时:

static <T> Stream<List<T>> partitionList(List<T> source, int batchSize) {
    if (batchSize <= 0) {
        throw new IllegalArgumentException(String.format("The batchSize cannot be smaller than 0. Actual value: %s", batchSize));
    }
    if (source.isEmpty()) {
        return Stream.empty();
    }
    int nrOfFullBatches = (source.size() - 1) / batchSize;
    return IntStream.rangeClosed(0, nrOfFullBatches)
      .mapToObj(batch -> {
          int startIndex = batch * batchSize;
          int endIndex = (batch == nrOfFullBatches) ? source.size() : (batch + 1) * batchSize;
          return source.subList(startIndex, endIndex);
      });
}

Finally, let’s test the solution. For an input list of numbers from 1 to 8 and a batch size of 3, we’ll expect three sub-lists:

最后,让我们测试一下解决方案。如果输入的数字列表从 18,批量大小为 3,我们将得到三个子列表:

@Test
void whenPartitionList_thenReturnThreeSubLists() {
    List<Integer> source = List.of(1, 2, 3, 4, 5, 6, 7, 8);

    Stream<List<Integer>> result = partitionList(source, 3);

    assertThat(result).containsExactlyInAnyOrder(
      List.of(1, 2, 3),
      List.of(4, 5, 6),
      List.of(7, 8)
    );
}

3. Partitioning a Parallel Stream

3.分割并行 流</em

Streams come with distinctive characteristics, such as lazy evaluation and the capacity for parallel processing. Embracing these features can be achieved by creating a custom Collector.

具有与众不同的特性,例如延迟评估和并行处理能力。可以通过创建自定义 Collector 来实现这些特性。 <br

Moreover, given that the desired return type is a list of sub-lists, we’ll also make use of certain functions already defined by Collectors.toList(), which we’ll refer to as the downstream collector:

此外,鉴于所需的返回类型是一个子列表,我们还将使用 Collectors.toList() 已定义的某些函数,我们将把它们称为 下游收集器:

static <T> List<List<T>> partitionStream(Stream<T> source, int batchSize) {
    return source.collect(partitionBySize(batchSize, Collectors.toList()));
}

static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List<T>, A, R> downstream) {
    return Collector.of( ... );
}

We can create a Collector using the static factory method Collector.of(). Let’s consult the documentation and see what each of the parameters represents:

我们可以使用 static 工厂方法 Collector.of() 创建 Collector 。让我们查阅 文档,看看每个参数代表什么:

  • supplier – The supplier function for the new collector
  • accumulator – The accumulator function for the new collector
  • combiner – The combiner function for the new collector
  • finisher – The finisher function for the new collector
  • characteristics – The collector characteristics for the new collector

Now, let’s systematically walk through each of them, creating and understanding their functionality one by one.

现在,让我们系统地逐一介绍、创建和了解它们的功能。

3.1. The Supplier

3.1.供应商</em

We’ll use a temporary object to accumulate the data and split it into batches. This accumulator is typically concealed as an implementation detail.

我们将使用一个临时对象来累积数据,并将其分成若干批次。这个累加器通常作为实现细节被隐藏起来。

Upon completion of the collection operation, we invoke the finisher function, which transforms this accumulator into the final result returned by the collector. The first parameter of the factory method Collector.of() will be a function that supplies an instance of our custom Accumulator.

收集操作完成后,我们将调用终结者函数,将累加器转换为收集器返回的最终结果。工厂方法Collector.of() 的第一个参数将是一个提供自定义 Accumulator. 实例的函数。

This temporary accumulator encapsulates a list of values and the fixed batch size. Furthermore, it provides the caller with the flexibility to specify a listener that is notified when a batch reaches its capacity. Additionally, it includes a generic field to accommodate a downstream collector:

这个临时累加器封装了一个值列表和固定的批量大小。此外,它还为调用者提供了指定监听器的灵活性,当批次达到其容量时,监听器就会收到通知。此外,它还包含一个通用字段,可用于下游收集器:

static class Accumulator<T, A> {
    private final List<T> values = new ArrayList<>();
    private final int batchSize;
    private A downstreamAccumulator;
    private final BiConsumer<A, List<T>> batchFullListener;

    // constructor
}

Needless to say, the accumulator remains fully encapsulated. For this reason, we’ll create it as a static inner class, and we’ll favor the package-protected access modifier.

不用说,累加器仍然是完全封装的。因此,我们将把它创建为一个 static 内部类,并使用受软件包保护的访问修饰符。

Now, let’s write a method that accepts a new value. After adding it to the list, if the size of the list reaches the batchSize, we’ll notify the listener and then clear the values:

现在,让我们编写一个接受新值的方法。将其添加到列表后,如果列表的大小达到 batchSize ,我们将通知监听器,然后清除值:

void add(T value) {
    values.add(value);
    if (values.size() == batchSize) {
        batchFullListener.accept(downstreamAccumulator, new ArrayList<>(values));
        values.clear();
    }
}

Let’s create the Supplier that instantiates this Accumulator. When a batch is full, we’ll delegate to the downstream accumulator, in our case, the one coming from Collectors.toList():

让我们创建一个 Supplier 来实例化这个 Accumulator 。当批次已满时,我们将委托给下游累加器,在我们的例子中,就是来自 Collectors.toList() 的累加器:</em

(acc, values) -> downstream.accumulator().accept(acc, values)

Finally, we can re-write this BiConsumer using method reference and create our Supplier:

最后,我们可以使用方法引用重写这个 BiConsumer 并创建我们的 Supplier

Supplier<Accumulator> supplier =  () -> new Accumulator<>(
  batchSize,
  downstream.supplier().get(),
  downstream.accumulator()::accept
);

3.2. The Accumulator

3.2.累加器</em

The second argument when creating a custom Collector will be a BiConsumer that accepts an Accumulator and the new value. In our case, we’ll simply delegate to the Accumulator and allow it to add the value to the current batch:

创建自定义 Collector 时的第二个参数将是一个 BiConsumer ,它接受 Accumulator 和新值。在我们的例子中,我们只需委托 Accumulator 并允许它将值添加到当前批次中:

BiConsumer<Accumulator<T, A>, T> accumulator = (acc, value) -> acc.add(value);

3.3. The Combiner

3.3.组合器</em

The combiner is a function that accepts two Accumulators and provides a way of merging them together. Firstly, we need to merge their downstreamAccumulators using the downstream’s combiner. After that, we can stream all the values accumulated by one of the accumulators and add them to the other one:

组合器是一个接受两个累加器并提供将它们合并在一起的方法的函数。首先,我们需要使用下游的组合器合并两个下游累加器。然后,我们可以将其中一个累加器积累的所有数值流化,并添加到另一个累加器中:

BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> {
    acc1.downstreamAccumulator = downstream.combiner()
      .apply(acc1.downstreamAccumulator, acc2.downstreamAccumulator);
    acc2.values.forEach(acc1::add);
    return acc1;
};

Let’s refactor the code and encapsulate this logic inside the Accumulator class itself:

让我们重构代码,将这一逻辑封装在 Accumulator 类中:

static class Accumulator<T, A> {
    private final List<T> values = new ArrayList<>();
    private final int batchSize;
    private A downstreamAccumulator;
    private final BiConsumer<A, List<T>> batchFullListener;

    // constructor

    void add(T value) {
        // ...  
    }

    Accumulator<T, A> combine(Accumulator<T, A> other, BinaryOperator<A> accumulatorCombiner) {
        this.downstreamAccumulator = accumulatorCombiner.apply(downstreamAccumulator, other.downstreamAccumulator);
        other.values.forEach(this::add);
        return this;
    }
}

This simplifies our combiner into a one-liner:

这就将我们的组合器简化为一个单行程序:

BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> acc1.combine(acc2, downstream.combiner());

3.4. The Finisher

3.4.终结者</em

As previously mentioned, we must establish a means to convert this custom Accumulator into the ultimate result: the List of Lists. This is another place where we can rely on the downstream collector to aggregate all the batches into a single list.

如前所述,我们必须建立一种方法,将此自定义 Accumulator 转换为最终结果:列表Lists 。这也是我们可以依靠 下游收集器将所有批次聚合到单个列表中的另一个地方。

Additionally, if the accumulator isn’t empty, indicating the presence of values from the last incomplete batch, we need to ensure that these values are consolidated before invoking the downstream finisher:

此外,如果累加器不是空的,表明存在上一批未完成的值,我们需要确保在调用下游完成器之前合并这些值:

Function<Accumulator<T, A>, R> finisher = acc -> {
    if (!acc.values.isEmpty()) {
        downstream.accumulator().accept(acc.downstreamAccumulator, acc.values);
    }
    return downstream.finisher().apply(acc.downstreamAccumulator);
};

3.5. The Collector Characteristics

3.5.收集器特征</em

Our collector is designed to be thread-safe and is suitable for use with parallel streams. This means that the final reduction process occurs concurrently across multiple threads. A natural consequence of this parallel processing is the inability to guarantee the order of elements.

我们的收集器设计为线程安全型,适用于并行流。这意味着最终的缩减过程将在多个线程中同时进行。这种并行处理的一个自然结果就是无法保证元素的顺序。

Collector Characteristics can be used to optimize reduction implementations. Based on the considerations we’ve outlined, we’ll configure the characteristics parameter to utilize Collector.Characteristics.UNORDERED:

Collector 特性可用于优化还原实现。基于我们概述的考虑因素,我们将配置特性参数,以使用 Collector.Characteristics.UNORDERED:

static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List, A, R> downstream) {
    // ...
    return Collector.of(
      supplier,
      accumulator,
      combiner,
      finisher,
      Collector.Characteristics.UNORDERED
    );
}

3.6. The Full Solution

3.6.完整解决方案

We now understand the roles played by each function used in collector creation. Let’s revisit the whole method before proceeding with the tests:

现在我们了解了创建收集器过程中使用的每个函数的作用。在继续测试之前,让我们重温一下整个方法:

static <T> List<List<T>> partitionStream(Stream<T> source, int batchSize) {
    return source.collect(partitionBySize(batchSize, Collectors.toList()));
}

static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List<T>, A, R> downstream) {
    Supplier<Accumulator<T, A>> supplier = () -> new Accumulator<>(
      batchSize, 
      downstream.supplier().get(), 
      downstream.accumulator()::accept
    );

    BiConsumer<Accumulator<T, A>, T> accumulator = (acc, value) -> acc.add(value);

    BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> acc1.combine(acc2, downstream.combiner());

    Function<Accumulator<T, A>, R> finisher = acc -> {
        if (!acc.values.isEmpty()) {
            downstream.accumulator().accept(acc.downstreamAccumulator, acc.values);
        }
        return downstream.finisher().apply(acc.downstreamAccumulator);
    };
    
    return Collector.of(supplier, accumulator, combiner, finisher, Collector.Characteristics.UNORDERED);
}

During testing, we’ll no longer be able to assert the values within each batch. Consequently, our assertions will focus solely on verifying the count and sizes of the batches. For instance, when partitioning a parallel stream that contains integers between 1 and 8 with a batchSize of 3, we’ll generate two complete batches, each containing three elements, and one batch with two elements:

在测试期间,我们将无法断言每个批次中的值。例如,在对包含 18 之间整数且 batchSize3 的并行流进行分区时,我们将生成两个完整的批次(每个批次包含三个元素)和一个包含两个元素的批次:

@Test
void whenPartitionStream_thenReturnThreeSubLists() {
    Stream<Integer> source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel();

    List<List<Integer>> result = partitionStream(source, 3);

    assertThat(result)
      .hasSize(3)
      .satisfies(batch -> assertThat(batch).hasSize(3), atIndex(0))
      .satisfies(batch -> assertThat(batch).hasSize(3), atIndex(1))
      .satisfies(batch -> assertThat(batch).hasSize(2), atIndex(2));
}

4. Partitioning a Stream Using Guava

4.使用 Guava 对 Stream 进行分区

To avoid potential errors, we can opt for the utilization of a proven third-party library rather than building a thread-safe Collector from scratch. For instance, Google’s Guava provides an elegant and concise way for partitioning a Stream into an Iterable comprising Lists of the same data type.

为了避免潜在的错误,我们可以选择使用成熟的第三方库,而不是从头开始构建线程安全的 Collector 。例如,谷歌的 Guava 提供了一种优雅而简洁的方法,可将 Stream 划分为由相同数据类型的 Lists 组成的 Iterable

Firstly, let’s add the dependency to our pom.xml:

首先,让我们将 依赖关系添加到 pom.xml 中:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>33.0.0-jre</version>
</dependency>

Now, we can simply use the static method Iterables.partition(). This function accepts an Iterable and the desired batch size as its parameters:

现在,我们可以简单地使用 static 方法 Iterables.partition()。该函数接受一个 Iterable 和所需的批量大小作为参数:

static <T> Iterable<List<T>> partitionUsingGuava(Stream<T> source, int batchSize) {
    return Iterables.partition(source::iterator, batchSize);
}

The only distinction in our testing approach lies in the altered return type, now an Iterable. To assert the batch sizes, we’ll gather all elements of the Iterable into an ArrayList. Besides this adjustment, the testing procedure remains unchanged:

我们测试方法的唯一区别在于改变了返回类型,现在是 Iterable 类型。为了确定批量大小,我们将把 Iterable 中的所有元素收集到 ArrayList 中。除此调整外,测试程序保持不变:

@Test
void whenPartitionParallelStreamWithGuava_thenReturnThreeSubLists() {
    Stream<Integer> source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel();

    Iterable<List<Integer>> result = partitionUsingGuava(source, 3);

    assertThat(result)
      .map(ArrayList::new)
      .hasSize(3)
      .satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(0))
      .satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(1))
      .satisfies(batch -> assertThat(batch).asList().hasSize(2), atIndex(2));
}

5. Conclusion

5.结论

In this article, we explored various ways of partitioning a Stream in Java. We started by recalling how we can split a List into smaller sub-lists of fixed values of fixed sizes. Following that, we discussed the advantages of Streams and parallel Streams, and we created our own custom Collector for them.

在本文中,我们探讨了在 Java 中分割 Stream 的各种方法。首先,我们回顾了如何将 List 拆分为固定大小的固定值的较小子列表。随后,我们讨论了 Streams 和并行 Streams 的优点,并为它们创建了我们自己的自定义 Collector

Finally, we ventured into Guava’s API, which enables us to accomplish the same functionality effortlessly using the static method Iterables.partition().

最后,我们尝试使用 Guava 的 API,它使我们能够使用 static 方法 Iterables.partition() 轻松实现相同的功能。

As always, the complete source code for the examples is available over on GitHub.

与往常一样,这些示例的完整源代码可在 GitHub 上获取。