When to Use a Parallel Stream in Java – 什么时候使用Java中的平行流

最后修改: 2021年 5月 17日


1. Overview


Java 8 introduced the Stream API that makes it easy to iterate over collections as streams of data. It’s also very easy to create streams that execute in parallel and make use of multiple processor cores.

Java 8 引入了Stream API,使我们能够轻松地将集合作为数据流进行迭代。创建并行执行的流并利用多个处理器内核也是非常容易的。

We might think that it’s always faster to divide the work on more cores. But that is often not the case.


In this tutorial, we’ll explore the differences between sequential and parallel streams. We’ll first look at the default fork-join pool used by parallel streams.


We’ll also consider the performance implications of using a parallel stream, including memory locality and splitting/merging costs.


Finally, we’ll recommend when it makes sense to covert a sequential stream into a parallel one.


2. Streams in Java


A stream in Java is simply a wrapper around a data source, allowing us to perform bulk operations on the data in a convenient way.


It doesn’t store data or make any changes to the underlying data source. Rather, it adds support for functional-style operations on data pipelines.


2.1. Sequential Streams


By default, any stream operation in Java is processed sequentially, unless explicitly specified as parallel.


Sequential streams use a single thread to process the pipeline:


List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.stream().forEach(number ->
    System.out.println(number + " " + Thread.currentThread().getName())

The output of this sequential stream is predictable. The list elements will always be printed in an ordered sequence:


1 main
2 main
3 main
4 main

2.2. Parallel Streams


Any stream in Java can easily be transformed from sequential to parallel.


We can achieve this by adding the parallel method to a sequential stream or by creating a stream using the parallelStream method of a collection:


List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.parallelStream().forEach(number ->
    System.out.println(number + " " + Thread.currentThread().getName())

Parallel streams enable us to execute code in parallel on separate cores. The final result is the combination of each individual outcome.


However, the order of execution is out of our control. It may change every time we run the program:


4 ForkJoinPool.commonPool-worker-3
2 ForkJoinPool.commonPool-worker-5
1 ForkJoinPool.commonPool-worker-7
3 main

3. Fork-Join Framework


Parallel streams make use of the fork-join framework and its common pool of worker threads.


The fork-join framework was added to java.util.concurrent in Java 7 to handle task management between multiple threads.

在Java 7中,fork-join框架被添加到java.util.concurrent中,以处理多线程之间的任务管理。

3.1. Splitting Source


The fork-join framework is in charge of splitting the source data between worker threads and handling callback on task completion.


Let’s take a look at an example of calculating a sum of integers in parallel.


We’ll make use of the reduce method and add five to the starting sum, instead of starting from zero:


List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(5, Integer::sum);

In a sequential stream, the result of this operation would be 15.


But since the reduce operation is handled in parallel, the number five actually gets added up in every worker thread:


The actual result might differ depending on the number of threads used in the common fork-join pool.


In order to fix this issue, the number five should be added outside of the parallel stream:


List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(0, Integer::sum) + 5;

Therefore, we need to be careful about which operations can be run in parallel.


3.2. Common Thread Pool


The number of threads in the common pool is equal to the number of processor cores.


However, the API allows us to specify the number of threads it will use by passing a JVM parameter:


-D java.util.concurrent.ForkJoinPool.common.parallelism=4

It’s important to remember that this is a global setting and that it will affect all parallel streams and any other fork-join tasks that use the common pool. We strongly suggest that this parameter is not modified unless we have a very good reason for doing so.


3.3. Custom Thread Pool


Besides in the default, common thread pool, it’s also possible to run a parallel stream in a custom thread pool:


List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
ForkJoinPool customThreadPool = new ForkJoinPool(4);
int sum = customThreadPool.submit(
    () -> listOfNumbers.parallelStream().reduce(0, Integer::sum)).get();

Note that using the common thread pool is recommended by Oracle. We should have a very good reason for running parallel streams in custom thread pools.


4. Performance Implications


Parallel processing may be beneficial to fully utilize multiple cores. But we also need to consider the overhead of managing multiple threads, memory locality, splitting the source and merging the results.


4.1. The Overhead


Let’s take a look at an example integer stream.


We’ll run a benchmark on a sequential and parallel reduction operation:


IntStream.rangeClosed(1, 100).reduce(0, Integer::sum);
IntStream.rangeClosed(1, 100).parallel().reduce(0, Integer::sum);

On this simple sum reduction, converting a sequential stream into a parallel one resulted in worse performance:


Benchmark                                                     Mode  Cnt        Score        Error  Units
SplittingCosts.sourceSplittingIntStreamParallel               avgt   25      35476,283 ±     204,446  ns/op
SplittingCosts.sourceSplittingIntStreamSequential             avgt   25         68,274 ±       0,963  ns/op

The reason behind this is that sometimes the overhead of managing threads, sources and results is a more expensive operation than doing the actual work.


4.2. Splitting Costs


Splitting the data source evenly is a necessary cost to enable parallel execution, but some data sources split better than others.


Let’s demonstrate this using an ArrayList and a LinkedList:


private static final List<Integer> arrayListOfNumbers = new ArrayList<>();
private static final List<Integer> linkedListOfNumbers = new LinkedList<>();

static {
    IntStream.rangeClosed(1, 1_000_000).forEach(i -> {

We’ll run a benchmark on a sequential and parallel reduction operation on the two types of lists:


arrayListOfNumbers.stream().reduce(0, Integer::sum)
arrayListOfNumbers.parallelStream().reduce(0, Integer::sum);
linkedListOfNumbers.stream().reduce(0, Integer::sum);
linkedListOfNumbers.parallelStream().reduce(0, Integer::sum);

Our results show that converting a sequential stream into a parallel one brings performance benefits only for an ArrayList:


Benchmark                                                     Mode  Cnt        Score        Error  Units
DifferentSourceSplitting.differentSourceArrayListParallel     avgt   25    2004849,711 ±    5289,437  ns/op
DifferentSourceSplitting.differentSourceArrayListSequential   avgt   25    5437923,224 ±   37398,940  ns/op
DifferentSourceSplitting.differentSourceLinkedListParallel    avgt   25   13561609,611 ±  275658,633  ns/op
DifferentSourceSplitting.differentSourceLinkedListSequential  avgt   25   10664918,132 ±  254251,184  ns/op

The reason behind this is that arrays can split cheaply and evenly, while LinkedList has none of these properties. TreeMap and HashSet split better than LinkedList but not as well as arrays.


4.3. Merging Costs


Every time we split the source for parallel computation, we also need to make sure to combine the results in the end.


Let’s run a benchmark on a sequential and parallel stream, with sum and grouping as different merging operations:


arrayListOfNumbers.stream().reduce(0, Integer::sum);
arrayListOfNumbers.stream().parallel().reduce(0, Integer::sum);

Our results show that converting a sequential stream into a parallel one brings performance benefits only for the sum operation:


Benchmark                                                     Mode  Cnt        Score        Error  Units
MergingCosts.mergingCostsGroupingParallel                     avgt   25  135093312,675 ± 4195024,803  ns/op
MergingCosts.mergingCostsGroupingSequential                   avgt   25   70631711,489 ± 1517217,320  ns/op
MergingCosts.mergingCostsSumParallel                          avgt   25    2074483,821 ±    7520,402  ns/op
MergingCosts.mergingCostsSumSequential                        avgt   25    5509573,621 ±   60249,942  ns/op

The merge operation is really cheap for some operations, such as reduction and addition, but merge operations like grouping to sets or maps can be quite expensive.


4.4. Memory Locality


Modern computers use a sophisticated multilevel cache to keep frequently used data close to the processor. When a linear memory access pattern is detected, the hardware prefetches the next line of data under the assumption that it will probably be needed soon.


Parallelism brings performance benefits when we can keep the processor cores busy doing useful work. Since waiting for cache misses is not useful work, we need to consider the memory bandwidth as a limiting factor.


Let’s demonstrate this using two arrays, one using a primitive type and the other using an object data type:


private static final int[] intArray = new int[1_000_000];
private static final Integer[] integerArray = new Integer[1_000_000];

static {
    IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
        intArray[i-1] = i;
        integerArray[i-1] = i;

We’ll run a benchmark on a sequential and parallel reduction operation on the two arrays:


Arrays.stream(intArray).reduce(0, Integer::sum);
Arrays.stream(intArray).parallel().reduce(0, Integer::sum);
Arrays.stream(integerArray).reduce(0, Integer::sum);
Arrays.stream(integerArray).parallel().reduce(0, Integer::sum);

Our results show that converting a sequential stream into a parallel one brings slightly more performance benefits when using an array of primitives:


Benchmark                                                     Mode  Cnt        Score        Error  Units
MemoryLocalityCosts.localityIntArrayParallel                sequential stream  avgt   25     116247,787 ±     283,150  ns/op
MemoryLocalityCosts.localityIntArraySequential                avgt   25     293142,385 ±    2526,892  ns/op
MemoryLocalityCosts.localityIntegerArrayParallel              avgt   25    2153732,607 ±   16956,463  ns/op
MemoryLocalityCosts.localityIntegerArraySequential            avgt   25    5134866,640 ±  148283,942  ns/op

An array of primitives brings the best locality possible in Java. In general, the more pointers we have in our data structure, the more pressure we put on the memory to fetch the reference objects. This can have a negative effect on parallelization, as multiple cores simultaneously fetch the data from memory.


4.5. The NQ Model


Oracle presented a simple model that can help us determine whether parallelism can offer us a performance boost. In the NQ model, N stands for the number of source data elements, while Q represents the amount of computation performed per data element.


The larger the product of N*Q, the more likely we are to get a performance boost from parallelization. For problems with a trivially small Q, such as summing up numbers, the rule of thumb is that N should be greater than 10,000. As the number of computations increases, the data size required to get a performance boost from parallelism decreases.


4.6. File Search Cost


File Search using parallel streams performs better in comparison to sequential streams. Let’s run a benchmark on a sequential and parallel stream for search over 1500 text files:


      .filter(path -> path.getFileName().toString().endsWith(".txt")).collect(Collectors.toList());
      isRegularFile).filter(path -> path.getFileName().toString().endsWith(".txt")).

Our results show that converting a sequential stream into a parallel one brings slightly more performance benefits when searching a greater number of files:


Benchmark                                Mode  Cnt     Score         Error    Units
FileSearchCost.textFileSearchParallel    avgt   25  10808832.831 ± 446934.773  ns/op
FileSearchCost.textFileSearchSequential  avgt   25  13271799.599 ± 245112.749  ns/op

5. When to Use Parallel Streams


As we’ve seen, we need to be very considerate when using parallel streams.


Parallelism can bring performance benefits in certain use cases. But parallel streams cannot be considered as a magical performance booster. So, sequential streams should still be used as default during development.


A sequential stream can be converted to a parallel one when we have actual performance requirements. Given those requirements, we should first run a performance measurement and consider parallelism as a possible optimization strategy.


A large amount of data and many computations done per element indicate that parallelism could be a good option.


On the other hand, a small amount of data, unevenly splitting sources, expensive merge operations and poor memory locality indicate a potential problem for parallel execution.


6. Conclusion


In this article, we explored the difference between sequential and parallel streams in Java. We learned that parallel streams make use of the default fork-join pool and its worker threads.


Then we saw how parallel streams do not always bring performance benefits. We considered the overhead of managing multiple threads, memory locality, splitting the source and merging the results. We saw that arrays are a great data source for parallel execution because they bring the best possible locality and can split cheaply and evenly.


Finally, we looked at the NQ model and recommended using parallel streams only when we have actual performance requirements.


As always, the source code is available over on GitHub.
