Introduction to Thread Pools in Java – Java中的线程池介绍

最后修改: 2016年 8月 10日

1. Overview

1.概述

This tutorial is a look at thread pools in Java. We’ll start with the different implementations in the standard Java library and then look at Google’s Guava library.

本教程是对Java中线程池的考察。我们将从标准Java库中的不同实现开始,然后看看Google的Guava库。

2. The Thread Pool

2.线程池

In Java, threads are mapped to system-level threads, which are the operating system’s resources. If we create threads uncontrollably, we may run out of these resources quickly.

在Java中,线程被映射到系统级线程,这是操作系统的资源。如果我们不加控制地创建线程,我们可能会很快用完这些资源。

The operating system does the context switching between threads as well — in order to emulate parallelism. A simplistic view is that the more threads we spawn, the less time each thread spends doing actual work.

操作系统也会在线程之间进行上下文切换–以模拟并行性。一个简单的观点是,我们生成的线程越多,每个线程花在实际工作上的时间就越少。

The Thread Pool pattern helps to save resources in a multithreaded application and to contain the parallelism in certain predefined limits.

线程池模式有助于节省多线程应用程序的资源,并将并行性控制在某些预定的范围内。

When we use a thread pool, we write our concurrent code in the form of parallel tasks and submit them for execution to an instance of a thread pool. This instance controls several re-used threads for executing these tasks.
2016-08-10_10-16-52-1024x572

当我们使用线程池时,我们以并行任务的形式编写我们的并发代码,并将它们提交给线程池的一个实例来执行。该实例控制着几个重复使用的线程来执行这些任务。
2016-08-10_10-16-52-1024x572

The pattern allows us to control the number of threads the application creates and their life cycle. We’re also able to schedule tasks’ execution and keep incoming tasks in a queue.

该模式允许我们控制应用程序创建的线程数量以及它们的生命周期。我们还能够安排任务的执行,并将进入的任务放在队列中。

3. Thread Pools in Java

3.Java中的线程池

3.1. Executors, Executor and ExecutorService

3.1.执行者执行者执行者服务

The Executors helper class contains several methods for the creation of preconfigured thread pool instances. Those classes are a good place to start. We can use them if we don’t need to apply any custom fine-tuning.

Executors辅助类包含几个方法,用于创建预配置的线程池实例。这些类是一个很好的开始。如果我们不需要应用任何自定义的微调,我们可以使用它们。

We use the Executor and ExecutorService interfaces to work with different thread pool implementations in Java. Usually, we should keep our code decoupled from the actual implementation of the thread pool and use these interfaces throughout our application.

我们使用ExecutorExecutorService接口来处理Java中不同的线程池实现。通常情况下,我们应该保持我们的代码与线程池的实际实现脱钩,并在我们的应用程序中使用这些接口。

3.1.1. Executor

The Executor interface has a single execute method to submit Runnable instances for execution.

Executor接口有一个execute方法来提交Runnable实例供执行。

Let’s look at a quick example of how to use the Executors API to acquire an Executor instance backed by a single thread pool and an unbounded queue for executing tasks sequentially.

让我们来看一个快速的例子,如何使用Executors API来获取一个Executor实例,该实例由一个单线程池和一个无界队列支持,用于按顺序执行任务。

Here, we run a single task that simply prints “Hello World on the screen. We’ll submit the task as a lambda (a Java 8 feature), which is inferred to be Runnable:

在这里,我们运行一个简单的任务,在屏幕上打印出 “Hello World。我们将把这个任务作为一个lambda(Java 8的一个特性)提交,它被推断为Runnable

Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));

3.1.2. ExecutorService

The ExecutorService interface contains a large number of methods to control the progress of the tasks and manage the termination of the service. Using this interface, we can submit the tasks for execution and also control their execution using the returned Future instance.

ExecutorService接口包含大量的方法,用于控制任务的进度和管理服务的终止。使用这个接口,我们可以提交任务进行执行,也可以使用返回的Future实例控制它们的执行。

Now we’ll create an ExecutorService, submit a task and then use the returned Future‘s get method to wait until the submitted task finishes and the value is returned:

现在我们将创建一个ExecutorService,提交一个任务,然后使用返回的Futureget方法来等待,直到提交的任务完成,值被返回。

ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> "Hello World");
// some operations
String result = future.get();

Of course, in a real-life scenario, we usually don’t want to call future.get() right away but defer calling it until we actually need the value of the computation.

当然,在现实生活中,我们通常不想立即调用future.get(),而是推迟到我们真正需要计算值的时候再调用。

Here, we overload the submit method to take either Runnable or Callable. Both of these are functional interfaces, and we can pass them as lambdas (starting with Java 8).

在这里,我们重载了submit方法,以接受RunnableCallable。这两个都是功能接口,我们可以把它们作为lambdas来传递(从Java 8开始)。

Runnable‘s single method does not throw an exception and does not return a value. The Callable interface may be more convenient, as it allows us to throw an exception and return a value.

Runnable的单一方法不会抛出异常,也不会返回一个值。Callable接口可能更方便,因为它允许我们抛出一个异常并返回一个值。

Finally, to let the compiler infer the Callable type, simply return a value from the lambda.

最后,为了让编译器推断出Callable类型,只需从lambda返回一个值。

For more examples of using the ExecutorService interface and futures, have a look at A Guide to the Java ExecutorService.

有关使用ExecutorService接口和期货的更多示例,请看Java ExecutorService指南

3.2. ThreadPoolExecutor

3.2.ThreadPoolExecutor

The ThreadPoolExecutor is an extensible thread pool implementation with lots of parameters and hooks for fine-tuning.

ThreadPoolExecutor是一个可扩展的线程池实现,有很多参数和钩子用于微调。

The main configuration parameters that we’ll discuss here are corePoolSize, maximumPoolSize and keepAliveTime.

我们将在这里讨论的主要配置参数是corePoolSizemaximumPoolSizekeepAliveTime

The pool consists of a fixed number of core threads that are kept inside all the time. It also consists of some excessive threads that may be spawned and then terminated when they are no longer needed.

该池由固定数量的核心线程组成,这些线程一直保持在里面。它还由一些过度的线程组成,这些线程可能被生成,然后在不再需要时被终止。

The corePoolSize parameter is the number of core threads that will be instantiated and kept in the pool. When a new task comes in, if all core threads are busy and the internal queue is full, the pool is allowed to grow up to maximumPoolSize.

corePoolSize参数是将被实例化并保留在池中的核心线程的数量。当一个新任务进来时,如果所有的核心线程都很忙,并且内部队列已满,则允许池子增长到maximumPoolSize

The keepAliveTime parameter is the interval of time for which the excessive threads (instantiated in excess of the corePoolSize) are allowed to exist in the idle state. By default, the ThreadPoolExecutor only considers non-core threads for removal. In order to apply the same removal policy to core threads, we can use the allowCoreThreadTimeOut(true) method.

keepAliveTime参数是允许过多的线程(实例化超过corePoolSize)在空闲状态下存在的时间间隔。默认情况下,ThreadPoolExecutor只考虑非核心线程的移除。为了对核心线程应用同样的删除策略,我们可以使用allowCoreThreadTimeOut(true)方法。

These parameters cover a wide range of use cases, but the most typical configurations are predefined in the Executors static methods.

这些参数涵盖了广泛的使用情况,但最典型的配置是在Executors静态方法中预定义的。

3.2.1. newFixedThreadPool

Let’s look at an example. newFixedThreadPool method creates a ThreadPoolExecutor with equal corePoolSize and maximumPoolSize parameter values and a zero keepAliveTime. This means that the number of threads in this thread pool is always the same:

让我们看一个例子。newFixedThreadPool方法创建了一个ThreadPoolExecutor,其corePoolSizemaximumPoolSize参数值相等,keepAliveTime为零。这意味着该线程池中的线程数始终是相同的。

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());

Here, we instantiate a ThreadPoolExecutor with a fixed thread count of 2. This means that if the number of simultaneously running tasks is always less than or equal to two, they get executed right away. Otherwise, some of these tasks may be put into a queue to wait for their turn.

在这里,我们实例化了一个ThreadPoolExecutor,其固定线程数为2。这意味着,如果同时运行的任务数量总是小于或等于2,它们会被立即执行。否则,其中一些任务可能会被放入队列,等待轮到它们。

We created three Callable tasks that imitate heavy work by sleeping for 1000 milliseconds. The first two tasks will be run at once, and the third one will have to wait in the queue. We can verify it by calling the getPoolSize() and getQueue().size() methods immediately after submitting the tasks.

我们创建了三个Callable任务,通过睡眠1000毫秒来模仿繁重的工作。前两个任务将被同时运行,第三个任务将不得不在队列中等待。我们可以在提交任务后立即调用getPoolSize()getQueue().size()方法来验证。

3.2.2. Executors.newCachedThreadPool() 

We can create another preconfigured ThreadPoolExecutor with the Executors.newCachedThreadPool() method. This method does not receive a number of threads at all. We set the corePoolSize to 0 and set the maximumPoolSize to Integer.MAX_VALUE. Finally, the keepAliveTime is 60 seconds:

我们可以用Executors.newCachedThreadPool()方法创建另一个预配置的ThreadPoolExecutor。这个方法根本就不接收线程的数量。我们将corePoolSize设为0,将maximumPoolSize设为Integer.MAX_VALUE。最后,keepAliveTime为60秒。

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});

assertEquals(3, executor.getPoolSize());
assertEquals(0, executor.getQueue().size());

These parameter values mean that the cached thread pool may grow without bounds to accommodate any number of submitted tasks. But when the threads are not needed anymore, they will be disposed of after 60 seconds of inactivity. A typical use case is when we have a lot of short-living tasks in our application.

这些参数值意味着缓存的线程池可以无限制地增长,以适应任何数量的提交任务。但当不再需要这些线程时,它们将在60秒的非活动时间后被处理掉。一个典型的用例是,当我们的应用程序中有很多短命的任务。

The queue size will always be zero because internally a SynchronousQueue instance is used. In a SynchronousQueue, pairs of insert and remove operations always occur simultaneously. So, the queue never actually contains anything.

队列大小将始终为零,因为内部使用了一个SynchronousQueue实例。在SynchronousQueue中,一对insertremove操作总是同时发生。因此,该队列实际上从未包含任何东西。

3.2.3. Executors.newSingleThreadExecutor()

The Executors.newSingleThreadExecutor() API creates another typical form of ThreadPoolExecutor containing a single thread. The single thread executor is ideal for creating an event loop. The corePoolSize and maximumPoolSize parameters are equal to 1, and the keepAliveTime is 0.

Executors.newSingleThreadExecutor() API创建了另一种典型的ThreadPoolExecutor形式,包含一个单线程。单线程执行器是创建事件循环的理想选择。corePoolSizemaximumPoolSize参数等于1,而keepAliveTime为0。

Tasks in the above example will be run sequentially, so the flag value will be 2 after the task’s completion:

上例中的任务将按顺序运行,所以任务完成后标志值将是2。

AtomicInteger counter = new AtomicInteger();

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    counter.set(1);
});
executor.submit(() -> {
    counter.compareAndSet(1, 2);
});

Additionally, this ThreadPoolExecutor is decorated with an immutable wrapper, so it can’t be reconfigured after creation. Note that this is also the reason we can’t cast it to a ThreadPoolExecutor.

此外,这个ThreadPoolExecutor被装饰了一个不可变的包装器,所以它在创建后不能被重新配置。请注意,这也是我们不能把它投到ThreadPoolExecutor的原因。

3.3. ScheduledThreadPoolExecutor

3.3.ScheduledThreadPoolExecutor

The ScheduledThreadPoolExecutor extends the ThreadPoolExecutor class and also implements the ScheduledExecutorService interface with several additional methods:

ScheduledThreadPoolExecutor扩展了ThreadPoolExecutor类,并且还实现了ScheduledExecutorService接口和一些附加方法。

  • schedule method allows us to run a task once after a specified delay.
  • scheduleAtFixedRate method allows us to run a task after a specified initial delay and then run it repeatedly with a certain period. The period argument is the time measured between the starting times of the tasks, so the execution rate is fixed.
  • scheduleWithFixedDelay method is similar to scheduleAtFixedRate in that it repeatedly runs the given task, but the specified delay is measured between the end of the previous task and the start of the next. The execution rate may vary depending on the time it takes to run any given task.

We typically use the Executors.newScheduledThreadPool() method to create a ScheduledThreadPoolExecutor with a given corePoolSize, unbounded maximumPoolSize and zero keepAliveTime.

我们通常使用Executors.newScheduledThreadPool()方法来创建一个ScheduledThreadPoolExecutor,具有给定的corePoolSize,无界的maximumPoolSize和零keepAliveTime

Here’s how to schedule a task for execution in 500 milliseconds:

下面是如何安排一个任务在500毫秒内执行。

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
    System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);

The following code shows how to run a task after 500 milliseconds delay and then repeat it every 100 milliseconds. After scheduling the task, we wait until it fires three times using the CountDownLatch lock. Then we cancel it using the Future.cancel() method:

下面的代码显示了如何在延迟500毫秒后运行一个任务,然后每100毫秒重复一次。在调度任务后,我们等待它使用CountDownLatch锁发射三次。然后我们使用Future.cancel()方法取消它。

CountDownLatch lock = new CountDownLatch(3);

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
    System.out.println("Hello World");
    lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);

lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);

3.4. ForkJoinPool

3.4.ForkJoinPool

ForkJoinPool is the central part of the fork/join framework introduced in Java 7. It solves a common problem of spawning multiple tasks in recursive algorithms. We’ll run out of threads quickly by using a simple ThreadPoolExecutor, as every task or subtask requires its own thread to run.

ForkJoinPool是Java 7中引入的fork/join框架的核心部分。它解决了一个常见的问题,即在递归算法中生成多个任务。通过使用一个简单的ThreadPoolExecutor,我们会很快耗尽线程,因为每个任务或子任务都需要自己的线程来运行。

In a fork/join framework, any task can spawn (fork) a number of subtasks and wait for their completion using the join method. The benefit of the fork/join framework is that it does not create a new thread for each task or subtask, instead implementing the work-stealing algorithm. This framework is thoroughly described in our Guide to the Fork/Join Framework in Java.

fork/join框架中,任何任务都可以产生(fork)一些子任务,并使用join方法等待其完成。fork/join框架的好处是,它不会为每个任务或子任务创建一个新的线程,而是实现工作窃取的算法。我们的《Java中的分叉/联合框架指南》中对该框架进行了详尽的描述。

Let’s look at a simple example of using ForkJoinPool to traverse a tree of nodes and calculate the sum of all leaf values. Here’s a simple implementation of a tree consisting of a node, an int value and a set of child nodes:

让我们看一个使用 ForkJoinPool 来遍历一棵树的节点并计算所有叶子值的总和的简单例子。下面是一个由一个节点、一个int值和一组子节点组成的树的简单实现。

static class TreeNode {

    int value;

    Set<TreeNode> children;

    TreeNode(int value, TreeNode... children) {
        this.value = value;
        this.children = Sets.newHashSet(children);
    }
}

Now if we want to sum all values in a tree in parallel, we need to implement a RecursiveTask<Integer> interface. Each task receives its own node and adds its value to the sum of values of its children. To calculate the sum of children values, task implementation does the following:

现在,如果我们想并行地对树中的所有值进行求和,我们需要实现一个RecursiveTask<Integer>接口。每个任务接收自己的节点,并将其值添加到其的值的总和中。为了计算值的总和,任务实现做了以下工作。

  • streams the children set
  • maps over this stream, creating a new CountingTask for each element
  • runs each subtask by forking it
  • collects the results by calling the join method on each forked task
  • sums the results using the Collectors.summingInt collector
public static class CountingTask extends RecursiveTask<Integer> {

    private final TreeNode node;

    public CountingTask(TreeNode node) {
        this.node = node;
    }

    @Override
    protected Integer compute() {
        return node.value + node.children.stream()
          .map(childNode -> new CountingTask(childNode).fork())
          .collect(Collectors.summingInt(ForkJoinTask::join));
    }
}

The code to run the calculation on an actual tree is very simple:

在实际的树上运行计算的代码非常简单。

TreeNode tree = new TreeNode(5,
  new TreeNode(3), new TreeNode(2,
    new TreeNode(2), new TreeNode(8)));

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));

4. Thread Pool’s Implementation in Guava

4.线程池在Guava中的实现

Guava is a popular Google library of utilities. It has many useful concurrency classes, including several handy implementations of ExecutorService. The implementing classes are not accessible for direct instantiation or subclassing, so the only entry point for creating their instances is the MoreExecutors helper class.

Guava是一个流行的Google实用程序库。它有许多有用的并发性类,包括ExecutorService的几个方便的实现。这些实现类不能被直接实例化或子类化,因此创建其实例的唯一入口是MoreExecutors辅助类。

4.1. Adding Guava as a Maven Dependency

4.1.将Guava作为Maven的依赖项添加

We add the following dependency to our Maven pom file to include the Guava library to our project. Find the latest version of Guava library in the Maven Central Repository:

我们在Maven pom文件中添加以下依赖项,以便将Guava库纳入我们的项目。在Maven Central资源库中找到最新版本的Guava库。

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.0.1-jre</version>
</dependency>

4.2. Direct Executor and Direct Executor Service

4.2.直接执行人和直接执行人服务

Sometimes we want to run the task either in the current thread or in a thread pool, depending on some conditions. We would prefer to use a single Executor interface and just switch the implementation. Although it’s not so hard to come up with an implementation of Executor or ExecutorService that runs the tasks in the current thread, this still requires writing some boilerplate code.

有时,我们希望根据某些条件,在当前线程或线程池中运行任务。我们更愿意使用一个Executor接口,只需切换实现。虽然想出一个在当前线程中运行任务的ExecutorExecutorService的实现并不难,但这仍然需要编写一些模板代码。

Gladly, Guava provides predefined instances for us.

很高兴地,Guava为我们提供了预定义的实例。

Here’s an example that demonstrates the execution of a task in the same thread. Although the provided task sleeps for 500 milliseconds, it blocks the current thread, and the result is available immediately after the execute call is finished:

这里有一个例子,演示了在同一个线程中执行一个任务。虽然所提供的任务睡眠时间为500毫秒,但它阻塞了当前线程,在execute调用完成后,结果立即可用。

Executor executor = MoreExecutors.directExecutor();

AtomicBoolean executed = new AtomicBoolean();

executor.execute(() -> {
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executed.set(true);
});

assertTrue(executed.get());

The instance returned by the directExecutor() method is actually a static singleton, so using this method does not provide any overhead on object creation at all.

directExecutor()方法返回的实例实际上是一个静态的单子,所以使用这个方法根本不会为对象的创建提供任何开销。

We should prefer this method to the MoreExecutors.newDirectExecutorService() because that API creates a full-fledged executor service implementation on every call.

我们应该更喜欢这个方法而不是MoreExecutors.newDirectExecutorService(),因为该API在每次调用时都会创建一个完整的执行器服务实现。

4.3. Exiting Executor Services

4.3.退出执行人服务

Another common problem is shutting down the virtual machine while a thread pool is still running its tasks. Even with a cancellation mechanism in place, there is no guarantee that the tasks will behave nicely and stop their work when the executor service shuts down. This may cause JVM to hang indefinitely while the tasks keep doing their work.

另一个常见的问题是关闭虚拟机而线程池仍在运行其任务。即使有取消机制,也不能保证任务会表现得很好,在执行器服务关闭时停止工作。这可能会导致JVM无限期地挂起,而任务却一直在做它们的工作。

To solve this problem, Guava introduces a family of exiting executor services. They are based on daemon threads that terminate together with the JVM.

为了解决这个问题,Guava引入了一系列的退出执行器服务。它们是基于与JVM一起终止的daemon线程。

These services also add a shutdown hook with the Runtime.getRuntime().addShutdownHook() method and prevent the VM from terminating for a configured amount of time before giving up on hung tasks.

这些服务还通过Runtime.getRuntime().addShutdownHook()方法添加了一个关机钩,并在放弃挂起的任务之前防止虚拟机终止配置的时间。

In the following example, we’re submitting the task that contains an infinite loop, but we use an exiting executor service with a configured time of 100 milliseconds to wait for the tasks upon VM termination.

在下面的例子中,我们提交的是包含无限循环的任务,但我们使用一个退出的执行者服务,配置的时间是100毫秒,在虚拟机终止时等待任务。

ThreadPoolExecutor executor = 
  (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
ExecutorService executorService = 
  MoreExecutors.getExitingExecutorService(executor, 
    100, TimeUnit.MILLISECONDS);

executorService.submit(() -> {
    while (true) {
    }
});

Without the exitingExecutorService in place, this task would cause the VM to hang indefinitely.

如果没有exitingExecutorService,这项任务将导致虚拟机无限期地挂起。

4.4. Listening Decorators

4.4.倾听的装饰者

Listening decorators allow us to wrap the ExecutorService and receive ListenableFuture instances upon task submission instead of simple Future instances. The ListenableFuture interface extends Future and has a single additional method addListener. This method allows adding a listener that is called upon future completion.

监听装饰器允许我们包裹ExecutorService并在任务提交时接收ListenableFuture实例,而不是简单的Future实例。ListenableFuture接口扩展了Future并有一个额外的方法addListener。这个方法允许添加一个在未来完成时被调用的监听器。

We’ll rarely want to use ListenableFuture.addListener() method directly. But it is essential to most of the helper methods in the Futures utility class.

我们很少想直接使用ListenableFuture.addListener()方法。但它对Futures实用类中的大多数辅助方法是必不可少的。

For instance, with the Futures.allAsList() method, we can combine several ListenableFuture instances in a single ListenableFuture that completes upon the successful completion of all the futures combined:

例如,通过Futures.allAsList()方法,我们可以将几个ListenableFuture实例组合成一个ListenableFuture,在所有的期货组合成功完成后完成。

ExecutorService executorService = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService = 
  MoreExecutors.listeningDecorator(executorService);

ListenableFuture<String> future1 = 
  listeningExecutorService.submit(() -> "Hello");
ListenableFuture<String> future2 = 
  listeningExecutorService.submit(() -> "World");

String greeting = Futures.allAsList(future1, future2).get()
  .stream()
  .collect(Collectors.joining(" "));
assertEquals("Hello World", greeting);

5. Conclusion

5.结论

In this article, we discussed the Thread Pool pattern and its implementations in the standard Java library and in Google’s Guava library.

在这篇文章中,我们讨论了线程池模式以及它在标准Java库和Google的Guava库中的实现。

The source code for the article is available over on GitHub.

该文章的源代码可在GitHub上获得