Custom Thread Pools In Java 8 Parallel Streams – Java 8并行流中的自定义线程池

最后修改: 2017年 2月 14日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Java 8 introduced the concept of Streams as an efficient way of carrying out bulk operations on data. And parallel Streams can be obtained in environments that support concurrency.

Java 8引入了Streams的概念,作为对数据进行批量操作的一种有效方式。而在支持并发的环境中,可以获得并行的Streams

These streams can come with improved performance – at the cost of multi-threading overhead.

这些数据流可以带来性能的提高–以多线程开销为代价。

In this quick tutorial, we’ll look at one of the biggest limitations of Stream API and see how to make a parallel stream work with a custom ThreadPool instance, alternatively – there’s a library that handles this.

在这个快速教程中,我们将探讨Stream API的最大限制之一,看看如何通过自定义ThreadPool实例来实现并行流,或者 – 有一个库可以处理这个

2. Parallel Stream

2.平行的

Let’s start with a simple example – calling the parallelStream method on any of the Collection types – which will return a possibly parallel Stream:

让我们从一个简单的例子开始–在任何一个Collection类型上调用parallelStream方法–它将返回一个可能的并行Stream

@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
    List<Long> aList = new ArrayList<>();
    Stream<Long> parallelStream = aList.parallelStream();
        
    assertTrue(parallelStream.isParallel());
}

The default processing that occurs in such a Stream uses the ForkJoinPool.commonPool(), a thread pool shared by the entire application.

在这样的Stream中发生的默认处理使用ForkJoinPool.commonPool(), 整个应用程序共享的线程池。

3. Custom Thread Pool

3.自定义线程池

We can actually pass a custom ThreadPool when processing the stream.

在处理stream时,我们实际上可以传递一个自定义的ThreadPool

The following example lets have a parallel Stream use a custom ThreadPool to calculate the sum of long values from 1 to 1,000,000, inclusive:

下面的例子让一个并行的Stream使用一个自定义的ThreadPool来计算从1到1,000,000的长值的总和,包括在内。

@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() 
  throws InterruptedException, ExecutionException {
    
    long firstNum = 1;
    long lastNum = 1_000_000;

    List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
      .collect(Collectors.toList());

    ForkJoinPool customThreadPool = new ForkJoinPool(4);
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
 
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}

We used the ForkJoinPool constructor with a parallelism level of 4. Some experimentation is required to determine the optimal value for different environments, but a good rule of thumb is simply choosing the number based on how many cores your CPU has.

我们使用ForkJoinPool构造器,并行度为4。需要进行一些实验来确定不同环境下的最佳值,但一个好的经验法则是根据你的CPU有多少个核心来选择这个数字。

Next, we processed the content of the parallel Stream, summing them up in the reduce call.

接下来,我们处理了并行Stream的内容,在reduce调用中把它们相加。

This simple example may not demonstrate the full usefulness of using a custom thread pool, but the benefits become obvious in situations where we do not want to tie-up the common thread pool with long-running tasks – such as processing data from a network source – or the common thread pool is being used by other components within the application.

这个简单的例子可能无法证明使用自定义线程池的全部用处,但在我们不想用长期运行的任务占用公共线程池的情况下,比如处理来自网络源的数据,或者公共线程池被应用程序中的其他组件使用,其好处就很明显了。

If we run the test method above, it’ll pass. So far, so good.

如果我们运行上面的测试方法,它就会通过。到目前为止,一切都很好。

However, if we instantiate ForkJoinPool class in a normal method in the same way as we do in the test method, it may lead to the OutOfMemoryError.

然而,如果我们在普通方法中以与测试方法相同的方式实例化ForkJoinPool类,可能会导致OutOfMemoryError

Next, let’s take a closer look at the cause of the memory leak.

接下来,让我们仔细看看内存泄漏的原因。

4. Beware of the Memory Leak

4.谨防内存泄漏

As we’ve talked about earlier, the common thread pool is used by the entire application by default. The common thread pool is a static ThreadPool instance.

正如我们前面谈到的,公共线程池默认被整个应用程序所使用。公共线程池是一个静态的ThreadPool instance。

Therefore, no memory leak occurs if we use the default thread pool.

因此,如果我们使用默认的线程池,就不会发生内存泄漏。

Now, let’s review our test method. In the test method, we created an object of ForkJoinPool. When the test method is finished, the customThreadPool object won’t be dereferenced and garbage collected — instead, it will be waiting for new tasks to be assigned.

现在,让我们回顾一下我们的测试方法。在测试方法中,我们创建了一个ForkJoinPool的对象。当测试方法完成时,customThreadPool对象将不会被解除引用和垃圾回收 – 相反,它将等待新的任务被分配

That is to say, every time we call the test method, a new customThreadPool object will be created and it won’t be released.

也就是说,每次我们调用测试方法时,都会创建一个新的customThreadPool对象,并且不会被释放。

The fix to the problem is pretty simple: shutdown the customThreadPool object after we’ve executed the method:

解决这个问题的方法很简单。关闭 customThreadPool 对象,在我们执行完方法后。

try {
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
    customThreadPool.shutdown();
}

5. Conclusion

5.结论

We have briefly looked at how to run a parallel Stream using a custom ThreadPool. In the right environment and with the proper use of the parallelism level, performance gains can be had in certain situations.

我们已经简单地研究了如何使用自定义的ThreadPool来运行一个并行的Stream。在正确的环境中,并正确使用并行度,在某些情况下可以获得性能的提升。

If we create a custom ThreadPool, we should keep in mind to call its shutdown() method to avoid a memory leak.

如果我们创建了一个自定义的 ThreadPool,我们应该记住调用其shutdown()方法以避免内存泄漏。

The complete code samples referenced in this article can be found over on GitHub.

本文引用的完整代码样本可以在GitHub上找到over