When to Use a Parallel Stream in Java – 什么时候使用Java中的平行流

最后修改: 2021年 5月 17日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Java 8 introduced the Stream API that makes it easy to iterate over collections as streams of data. It’s also very easy to create streams that execute in parallel and make use of multiple processor cores.

Java 8 引入了Stream API,使我们能够轻松地将集合作为数据流进行迭代。创建并行执行的流并利用多个处理器内核也是非常容易的。

We might think that it’s always faster to divide the work on more cores. But that is often not the case.

我们可能认为,在更多的核心上分工总是更快。但事实往往并非如此。

In this tutorial, we’ll explore the differences between sequential and parallel streams. We’ll first look at the default fork-join pool used by parallel streams.

在本教程中,我们将探讨顺序流和并行流之间的区别。我们首先看看并行流所使用的默认分叉连接池。

We’ll also consider the performance implications of using a parallel stream, including memory locality and splitting/merging costs.

我们还将考虑使用并行流的性能影响,包括内存定位和分割/合并成本。

Finally, we’ll recommend when it makes sense to covert a sequential stream into a parallel one.

最后,我们将推荐何时将顺序流转换为并行流是有意义的。

2. Streams in Java

2.Java中的流

A stream in Java is simply a wrapper around a data source, allowing us to perform bulk operations on the data in a convenient way.

Java中的stream仅仅是数据源的一个包装,允许我们以一种方便的方式对数据进行批量操作。

It doesn’t store data or make any changes to the underlying data source. Rather, it adds support for functional-style operations on data pipelines.

它不存储数据或对底层数据源做任何改变。相反,它增加了对数据管道的功能式操作的支持。

2.1. Sequential Streams

2.1.顺序流

By default, any stream operation in Java is processed sequentially, unless explicitly specified as parallel.

默认情况下,Java中的任何流操作都是按顺序处理的,除非明确指定为并行。

Sequential streams use a single thread to process the pipeline:

顺序流使用一个单线程来处理管道。

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.stream().forEach(number ->
    System.out.println(number + " " + Thread.currentThread().getName())
);

The output of this sequential stream is predictable. The list elements will always be printed in an ordered sequence:

这个顺序流的输出是可预测的。列表中的元素将总是以一个有序的顺序打印。

1 main
2 main
3 main
4 main

2.2. Parallel Streams

2.2.平行流

Any stream in Java can easily be transformed from sequential to parallel.

Java中的任何流都可以很容易地从顺序性转化为并行性。

We can achieve this by adding the parallel method to a sequential stream or by creating a stream using the parallelStream method of a collection:

我们可以通过在顺序流中添加parallel方法或者使用集合的parallelStream方法创建一个流来实现。

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.parallelStream().forEach(number ->
    System.out.println(number + " " + Thread.currentThread().getName())
);

Parallel streams enable us to execute code in parallel on separate cores. The final result is the combination of each individual outcome.

并行流使我们能够在不同的核心上并行地执行代码。最后的结果是每个单独结果的组合。

However, the order of execution is out of our control. It may change every time we run the program:

然而,执行的顺序是我们无法控制的。每次我们运行程序时,它都可能发生变化。

4 ForkJoinPool.commonPool-worker-3
2 ForkJoinPool.commonPool-worker-5
1 ForkJoinPool.commonPool-worker-7
3 main

3. Fork-Join Framework

3.叉式连接框架

Parallel streams make use of the fork-join framework and its common pool of worker threads.

并行流利用了fork-join框架和其共同的工作线程池。

The fork-join framework was added to java.util.concurrent in Java 7 to handle task management between multiple threads.

在Java 7中,fork-join框架被添加到java.util.concurrent中,以处理多线程之间的任务管理。

3.1. Splitting Source

3.1.分割源

The fork-join framework is in charge of splitting the source data between worker threads and handling callback on task completion.

fork-join框架负责在工作线程之间分割源数据并处理任务完成时的回调。

Let’s take a look at an example of calculating a sum of integers in parallel.

让我们来看看一个并行计算整数之和的例子。

We’ll make use of the reduce method and add five to the starting sum, instead of starting from zero:

我们将利用reduce方法,将5加到起始和上,而不是从0开始。

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(5, Integer::sum);
assertThat(sum).isNotEqualTo(15);

In a sequential stream, the result of this operation would be 15.

在一个顺序流中,这个操作的结果是15。

But since the reduce operation is handled in parallel, the number five actually gets added up in every worker thread:

但由于reduce操作是并行处理的,数字5实际上是在每个工作线程中被加起来的。

The actual result might differ depending on the number of threads used in the common fork-join pool.

实际结果可能会有所不同,这取决于共同叉接池中使用的线程数量。

In order to fix this issue, the number five should be added outside of the parallel stream:

为了解决这个问题,应该在平行流之外添加数字5。

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(0, Integer::sum) + 5;
assertThat(sum).isEqualTo(15);

Therefore, we need to be careful about which operations can be run in parallel.

因此,我们需要谨慎对待哪些操作可以并行运行。

3.2. Common Thread Pool

3.2.通用线程池

The number of threads in the common pool is equal to the number of processor cores.

公共池中的线程数等于处理器内核数。

However, the API allows us to specify the number of threads it will use by passing a JVM parameter:

然而,API允许我们通过传递一个JVM参数来指定它将使用的线程数量:

-D java.util.concurrent.ForkJoinPool.common.parallelism=4

It’s important to remember that this is a global setting and that it will affect all parallel streams and any other fork-join tasks that use the common pool. We strongly suggest that this parameter is not modified unless we have a very good reason for doing so.

重要的是要记住,这是一个全局设置,它将影响所有的并行流和任何其他使用公共池的分叉连接任务。我们强烈建议不要修改这个参数,除非我们有非常好的理由这样做

3.3. Custom Thread Pool

3.3.自定义线程池

Besides in the default, common thread pool, it’s also possible to run a parallel stream in a custom thread pool:

除了在默认的普通线程池中,也可以在自定义线程池中运行一个并行流。

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
ForkJoinPool customThreadPool = new ForkJoinPool(4);
int sum = customThreadPool.submit(
    () -> listOfNumbers.parallelStream().reduce(0, Integer::sum)).get();
customThreadPool.shutdown();
assertThat(sum).isEqualTo(10);

Note that using the common thread pool is recommended by Oracle. We should have a very good reason for running parallel streams in custom thread pools.

请注意,使用公共线程池是Oracle推荐的。We应该有一个非常好的理由在自定义线程池中运行并行流。

4. Performance Implications

4.性能的影响

Parallel processing may be beneficial to fully utilize multiple cores. But we also need to consider the overhead of managing multiple threads, memory locality, splitting the source and merging the results.

并行处理可能有利于充分利用多个核心。但我们也需要考虑管理多线程、内存定位、分割源和合并结果的开销。

4.1. The Overhead

4.1.开销

Let’s take a look at an example integer stream.

让我们看一下一个整数流的例子。

We’ll run a benchmark on a sequential and parallel reduction operation:

我们将对连续和平行的还原操作进行一次基准测试。

IntStream.rangeClosed(1, 100).reduce(0, Integer::sum);
IntStream.rangeClosed(1, 100).parallel().reduce(0, Integer::sum);

On this simple sum reduction, converting a sequential stream into a parallel one resulted in worse performance:

在这种简单的总和还原上,将顺序流转换为平行流会导致更差的性能。

Benchmark                                                     Mode  Cnt        Score        Error  Units
SplittingCosts.sourceSplittingIntStreamParallel               avgt   25      35476,283 ±     204,446  ns/op
SplittingCosts.sourceSplittingIntStreamSequential             avgt   25         68,274 ±       0,963  ns/op

The reason behind this is that sometimes the overhead of managing threads, sources and results is a more expensive operation than doing the actual work.

这背后的原因是,有时管理线程、源和结果的开销是比做实际工作更昂贵的操作。

4.2. Splitting Costs

4.2.分割成本

Splitting the data source evenly is a necessary cost to enable parallel execution, but some data sources split better than others.

均匀分割数据源是实现并行执行的必要成本,但有些数据源的分割效果比其他数据源好。

Let’s demonstrate this using an ArrayList and a LinkedList:

让我们用一个ArrayList和一个LinkedList来演示。

private static final List<Integer> arrayListOfNumbers = new ArrayList<>();
private static final List<Integer> linkedListOfNumbers = new LinkedList<>();

static {
    IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
        arrayListOfNumbers.add(i);
        linkedListOfNumbers.add(i);
    });
}

We’ll run a benchmark on a sequential and parallel reduction operation on the two types of lists:

我们将对这两种类型的列表进行顺序和平行还原操作的基准测试。

arrayListOfNumbers.stream().reduce(0, Integer::sum)
arrayListOfNumbers.parallelStream().reduce(0, Integer::sum);
linkedListOfNumbers.stream().reduce(0, Integer::sum);
linkedListOfNumbers.parallelStream().reduce(0, Integer::sum);

Our results show that converting a sequential stream into a parallel one brings performance benefits only for an ArrayList:

我们的结果显示,将顺序流转换为并行流仅对ArrayList带来性能优势。

Benchmark                                                     Mode  Cnt        Score        Error  Units
DifferentSourceSplitting.differentSourceArrayListParallel     avgt   25    2004849,711 ±    5289,437  ns/op
DifferentSourceSplitting.differentSourceArrayListSequential   avgt   25    5437923,224 ±   37398,940  ns/op
DifferentSourceSplitting.differentSourceLinkedListParallel    avgt   25   13561609,611 ±  275658,633  ns/op
DifferentSourceSplitting.differentSourceLinkedListSequential  avgt   25   10664918,132 ±  254251,184  ns/op

The reason behind this is that arrays can split cheaply and evenly, while LinkedList has none of these properties. TreeMap and HashSet split better than LinkedList but not as well as arrays.

这背后的原因是,数组可以廉价且均匀地分割,而LinkedList没有这些特性。TreeMapHashSet的分割效果比LinkedList好,但不如数组好。

4.3. Merging Costs

4.3.合并成本

Every time we split the source for parallel computation, we also need to make sure to combine the results in the end.

每次我们分割源头进行并行计算时,我们还需要确保在最后将结果合并。

Let’s run a benchmark on a sequential and parallel stream, with sum and grouping as different merging operations:

让我们在一个顺序流和并行流上运行一个基准,把和与分组作为不同的合并操作。

arrayListOfNumbers.stream().reduce(0, Integer::sum);
arrayListOfNumbers.stream().parallel().reduce(0, Integer::sum);
arrayListOfNumbers.stream().collect(Collectors.toSet());
arrayListOfNumbers.stream().parallel().collect(Collectors.toSet())

Our results show that converting a sequential stream into a parallel one brings performance benefits only for the sum operation:

我们的结果表明,将顺序流转换为并行流仅对和操作带来性能优势。

Benchmark                                                     Mode  Cnt        Score        Error  Units
MergingCosts.mergingCostsGroupingParallel                     avgt   25  135093312,675 ± 4195024,803  ns/op
MergingCosts.mergingCostsGroupingSequential                   avgt   25   70631711,489 ± 1517217,320  ns/op
MergingCosts.mergingCostsSumParallel                          avgt   25    2074483,821 ±    7520,402  ns/op
MergingCosts.mergingCostsSumSequential                        avgt   25    5509573,621 ±   60249,942  ns/op

The merge operation is really cheap for some operations, such as reduction and addition, but merge operations like grouping to sets or maps can be quite expensive.

合并操作对于某些操作来说确实很便宜,比如还原和加法,但合并操作,比如对集合或地图的分组,可能相当昂贵。

4.4. Memory Locality

4.4.内存定位

Modern computers use a sophisticated multilevel cache to keep frequently used data close to the processor. When a linear memory access pattern is detected, the hardware prefetches the next line of data under the assumption that it will probably be needed soon.

现代计算机使用复杂的多级缓存,将经常使用的数据保存在靠近处理器的地方。当检测到线性内存访问模式时,硬件会根据可能很快就会需要的假设预取下一行的数据。

Parallelism brings performance benefits when we can keep the processor cores busy doing useful work. Since waiting for cache misses is not useful work, we need to consider the memory bandwidth as a limiting factor.

当我们能让处理器内核忙于做有用的工作时,并行化就会带来性能上的好处。由于等待缓存错过不是有用的工作,我们需要考虑将内存带宽作为一个限制因素。

Let’s demonstrate this using two arrays, one using a primitive type and the other using an object data type:

让我们用两个数组来证明这一点,一个使用原始类型,另一个使用对象数据类型。

private static final int[] intArray = new int[1_000_000];
private static final Integer[] integerArray = new Integer[1_000_000];

static {
    IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
        intArray[i-1] = i;
        integerArray[i-1] = i;
    });
}

We’ll run a benchmark on a sequential and parallel reduction operation on the two arrays:

我们将在这两个阵列上运行一个顺序和平行还原操作的基准。

Arrays.stream(intArray).reduce(0, Integer::sum);
Arrays.stream(intArray).parallel().reduce(0, Integer::sum);
Arrays.stream(integerArray).reduce(0, Integer::sum);
Arrays.stream(integerArray).parallel().reduce(0, Integer::sum);

Our results show that converting a sequential stream into a parallel one brings slightly more performance benefits when using an array of primitives:

我们的结果表明,在使用基元阵列时,将顺序流转换为并行流带来的性能优势略大。

Benchmark                                                     Mode  Cnt        Score        Error  Units
MemoryLocalityCosts.localityIntArrayParallel                sequential stream  avgt   25     116247,787 ±     283,150  ns/op
MemoryLocalityCosts.localityIntArraySequential                avgt   25     293142,385 ±    2526,892  ns/op
MemoryLocalityCosts.localityIntegerArrayParallel              avgt   25    2153732,607 ±   16956,463  ns/op
MemoryLocalityCosts.localityIntegerArraySequential            avgt   25    5134866,640 ±  148283,942  ns/op

An array of primitives brings the best locality possible in Java. In general, the more pointers we have in our data structure, the more pressure we put on the memory to fetch the reference objects. This can have a negative effect on parallelization, as multiple cores simultaneously fetch the data from memory.

基元数组在Java中带来了最佳的定位性。一般来说,我们的数据结构中的指针越多,我们对内存的压力就越大,以获取引用对象。这可能会对并行化产生负面影响,因为多个核心同时从内存中获取数据。

4.5. The NQ Model

4.5.NQ模型

Oracle presented a simple model that can help us determine whether parallelism can offer us a performance boost. In the NQ model, N stands for the number of source data elements, while Q represents the amount of computation performed per data element.

Oracle提出了一个简单的模型,可以帮助我们确定并行化是否可以为我们提供性能提升。在NQ模型中,N代表源数据元素的数量,而Q代表每个数据元素执行的计算量。

The larger the product of N*Q, the more likely we are to get a performance boost from parallelization. For problems with a trivially small Q, such as summing up numbers, the rule of thumb is that N should be greater than 10,000. As the number of computations increases, the data size required to get a performance boost from parallelism decreases.

N*Q的乘积越大,我们就越有可能从并行化中获得性能提升。对于Q非常小的问题,如数字相加,经验法则是N应该大于10,000。随着计算数量的增加,从并行化中获得性能提升所需的数据大小也随之减少。

4.6. File Search Cost

4.6.文件搜索成本

File Search using parallel streams performs better in comparison to sequential streams. Let’s run a benchmark on a sequential and parallel stream for search over 1500 text files:

与顺序流相比,使用并行流的文件搜索表现得更好。让我们在顺序流和并行流上运行一个基准,对1500个文本文件进行搜索。

Files.walk(Paths.get("src/main/resources/")).map(Path::normalize).filter(Files::isRegularFile)
      .filter(path -> path.getFileName().toString().endsWith(".txt")).collect(Collectors.toList());
Files.walk(Paths.get("src/main/resources/")).parallel().map(Path::normalize).filter(Files::
      isRegularFile).filter(path -> path.getFileName().toString().endsWith(".txt")).
      collect(Collectors.toList());

Our results show that converting a sequential stream into a parallel one brings slightly more performance benefits when searching a greater number of files:

我们的结果表明,在搜索更多的文件时,将顺序流转换为并行流带来的性能优势略大。

Benchmark                                Mode  Cnt     Score         Error    Units
FileSearchCost.textFileSearchParallel    avgt   25  10808832.831 ± 446934.773  ns/op
FileSearchCost.textFileSearchSequential  avgt   25  13271799.599 ± 245112.749  ns/op

5. When to Use Parallel Streams

5.何时使用平行流

As we’ve seen, we need to be very considerate when using parallel streams.

正如我们所看到的,在使用并行流时,我们需要考虑得非常周到。

Parallelism can bring performance benefits in certain use cases. But parallel streams cannot be considered as a magical performance booster. So, sequential streams should still be used as default during development.

在某些使用情况下,并行性可以带来性能上的好处。但不能将并行流视为神奇的性能助推器。因此,在开发过程中,顺序流仍应被作为默认使用。

A sequential stream can be converted to a parallel one when we have actual performance requirements. Given those requirements, we should first run a performance measurement and consider parallelism as a possible optimization strategy.

当我们有实际的性能要求时,可以将顺序流转换为并行流。鉴于这些要求,我们应该首先运行性能测量,并将并行化作为一种可能的优化策略。

A large amount of data and many computations done per element indicate that parallelism could be a good option.

大量的数据和每个元素所做的许多计算表明,并行化可能是一个好的选择。

On the other hand, a small amount of data, unevenly splitting sources, expensive merge operations and poor memory locality indicate a potential problem for parallel execution.

另一方面,少量的数据、不均匀的分割源、昂贵的合并操作和较差的内存定位表明并行执行存在潜在问题。

6. Conclusion

6.结语

In this article, we explored the difference between sequential and parallel streams in Java. We learned that parallel streams make use of the default fork-join pool and its worker threads.

在这篇文章中,我们探讨了Java中顺序流和平行流之间的区别。我们了解到,并行流利用了默认的分叉连接池及其工作线程。

Then we saw how parallel streams do not always bring performance benefits. We considered the overhead of managing multiple threads, memory locality, splitting the source and merging the results. We saw that arrays are a great data source for parallel execution because they bring the best possible locality and can split cheaply and evenly.

然后我们看到平行流并不总是带来性能上的好处。我们考虑了管理多线程的开销、内存定位、分割源和合并结果。我们看到,数组是并行执行的一个很好的数据源,因为它们带来了最好的定位性,并且可以廉价而均匀地分割。

Finally, we looked at the NQ model and recommended using parallel streams only when we have actual performance requirements.

最后,我们研究了NQ模型,并建议只有在我们有实际性能要求时才使用并行流。

As always, the source code is available over on GitHub.

像往常一样,源代码可在GitHub上获得