1. Overview
1.概述
The Executor Framework in Java is an attempt to decouple task submission from task execution. While this approach abstracts away the task execution details very nicely, sometimes, we still need to configure it for even more optimal executions.
Java中的Executor框架是将任务提交与任务执行脱钩的一种尝试。虽然这种方法很好地抽象了任务执行的细节,但有时,我们仍然需要对其进行配置,以实现更优化的执行。
In this tutorial, we’re going to see what happens when a thread pool can’t accept any more tasks. Then, we’ll learn how to control this corner case by applying saturation policies appropriately.
在本教程中,我们将看到当一个线程池不能再接受任何任务时会发生什么。然后,我们将学习如何通过适当地应用饱和策略来控制这种角落情况。
2. Revisiting the Thread Pools
2.重新审视线程池
The following diagram shows how the executor service works internally:
下图显示了执行器服务的内部运作方式。
Here’s what happens when we submit a new task to the executor:
以下是当我们向执行者提交一个新任务时发生的情况。
- If one of the threads is available, it processes the task.
- Otherwise, the executor adds the new task to its queue.
- When a thread finishes the current task, it picks up another one from the queue.
2.1. The ThreadPoolExecutor
2.1.ThreadPoolExecutor
Most executor implementations use the well-known ThreadPoolExecutor as their base implementation. Therefore, to better understand how the task queueing works, we should take a closer look at its constructor:
大多数执行器的实现都使用著名的ThreadPoolExecutor作为其基础实现。因此,为了更好地理解任务队列的工作方式,我们应该仔细看看它的构造函数。
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler
)
2.2. Core Pool Size
2.2.核心池大小
The corePoolSize parameter determines the initial size of the thread pool. Usually, the executor makes sure that the thread pool contains at least corePoolSize number of threads.
corePoolSize参数决定了线程池的初始大小。通常情况下,执行器会确保线程池至少包含corePoolSize线程数。
However, it’s possible to have fewer threads if we enable the allowCoreThreadTimeOut parameter.
然而,如果我们启用allowCoreThreadTimeOut参数,就有可能减少线程。
2.3. Maximum Pool Size
2.3 池子的最大尺寸
Let’s suppose all core threads are busy executing a few tasks. As a result, the executor queues the new tasks until they get a chance to be processed later.
假设所有的核心线程都在忙着执行一些任务。结果,执行器将新的任务排在队列中,直到以后有机会被处理。
When this queue becomes full, the executor can add more threads to the thread pool. The maximumPoolSize puts an upper bound on the number of threads a thread pool can potentially contain.
当这个队列变满时,执行者可以向线程池添加更多线程。maximumPoolSize为线程池可能包含的线程数设定了上限。
When those threads remain idle for some time, the executor can remove them from the pool. Hence, the pool size can shrink back to its core size.
当这些线程闲置一段时间后,执行者可以将它们从池中移除。因此,池的大小可以缩回到其核心大小。
2.4. Queueing
2.4.排队
As we saw earlier, when all core threads are busy, the executor adds the new tasks to a queue. There are three different approaches for queueing:
正如我们前面所看到的,当所有的核心线程都很忙时,执行者会将新的任务添加到一个队列中。有三种不同的排队方法。
- Unbounded Queue: The queue can hold an unlimited number of tasks. Since this queue never fills up, the executor ignores the maximum size. The fixed size and single thread executors both use this approach.
- Bounded Queue: As its name suggests, the queue can only hold a limited number of tasks. As a result, the thread pool would grow when a bounded queue fills up.
- Synchronous Handoff: Quite surprisingly, this queue can’t hold any tasks! With this approach, we can queue a task if and only if there is another thread picking the same task on the other side at the same time. The cached thread pool executor uses this approach internally.
Let’s suppose the following scenario when we’re using either bounded queueing or synchronous handoff:
让我们假设以下情况,当我们使用有界队列或同步交接时。
- All core threads are busy
- The internal queue becomes full
- The thread pool grows to its maximum possible size, and all those threads are also busy
What happens when a new task comes in?
当一个新的任务到来时,会发生什么?。
3. Saturation Policies
3.饱和政策
When all threads are busy, and the internal queue fills up, the executor becomes saturated.
当所有线程都很忙,内部队列填满时,执行器就会饱和。
Executors can perform predefined actions once they hit saturation. These actions are known as Saturation Policies. We can modify the saturation policy of an executor by passing an instance of RejectedExecutionHandler to its constructor.
一旦达到饱和状态,执行器可以执行预定义的行动。这些行动被称为饱和策略。我们可以通过将一个RejectedExecutionHandler的实例传递给其构造函数来修改执行器的饱和策略。
Fortunately, Java provides a few built-in implementations for this class, each covering a specific use case. In the following sections, we’ll evaluate those policies in detail.
幸运的是,Java为这个类提供了一些内置的实现,每个实现都涵盖了一个特定的使用情况。在下面的章节中,我们将详细评估这些策略。
3.1. Abort Policy
3.1.中止政策
The default policy is the abort policy. Abort policy causes the executor to throw a RejectedExecutionException:
默认策略是中止策略。中止策略会导致执行器抛出一个RejectedExecutionException。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.AbortPolicy());
executor.execute(() -> waitFor(250));
assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected")))
.isInstanceOf(RejectedExecutionException.class);
Since the first task takes a long time to execute, the executor rejects the second task.
由于第一个任务需要很长的时间来执行,执行者拒绝第二个任务。
3.2. Caller-Runs Policy
3.2.呼叫者-运行政策
Instead of running a task asynchronously in another thread, this policy makes the caller thread execute the task:
这个策略不是在另一个线程中异步运行一个任务,而是让调用者线程执行该任务。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy());
executor.execute(() -> waitFor(250));
long startTime = System.currentTimeMillis();
executor.execute(() -> waitFor(500));
long blockedDuration = System.currentTimeMillis() - startTime;
assertThat(blockedDuration).isGreaterThanOrEqualTo(500);
After submitting the first task, the executor can’t accept any more new tasks. Therefore, the caller thread blocks until the second task returns.
在提交第一个任务后,执行器不能再接受任何新的任务。因此,调用者线程阻塞,直到第二个任务返回。
The caller-runs policy makes it easy to implement a simple form of throttling. That is, a slow consumer can slow down a fast producer to control the task submission flow.
调用者运行策略 使实施简单形式的节流变得容易。也就是说,缓慢的消费者可以放慢快速生产者的速度,以控制任务提交流程。
3.3. Discard Policy
3.3.丢弃政策
The discard policy silently discards the new task when it fails to submit it:
丢弃策略 当它未能提交新任务时,会悄悄地丢弃它。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.DiscardPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("Discarded Result"));
assertThat(queue.poll(200, MILLISECONDS)).isNull();
Here, the second task publishes a simple message to a queue. Since it never gets a chance to execute, the queue remains empty, even though we’re blocking on it for some time.
在这里,第二个任务将一个简单的消息发布到队列中。由于它从未有机会执行,队列一直是空的,尽管我们在它上面阻塞了一段时间。
3.4. Discard-Oldest Policy
3.4.丢弃-最古老的政策
The discard-oldest policy first removes a task from the head of the queue, then re-submits the new task:
discard-oldest策略 首先从队列的头部删除一个任务,然后重新提交新的任务。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardOldestPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);
List<String> results = new ArrayList<>();
queue.drainTo(results);
assertThat(results).containsExactlyInAnyOrder("Second", "Third");
This time, we’re using a bounded queue that can hold just two tasks. Here’s what happens when we submit these four tasks:
这一次,我们使用的是一个有界队列,只能容纳两个任务。下面是我们提交这四个任务时的情况。
- The first tasks hogs the single thread for 100 milliseconds
- The executor queues the second and third tasks successfully
- When the fourth task arrives, the discard-oldest policy removes the oldest task to make room for this new one
The discard-oldest policy and priority queues don’t play well together. Because the head of a priority queue has the highest priority, we may simply lose the most important task.
丢弃最旧的策略和优先级队列不能很好地配合。因为优先级队列的头部具有最高的优先级,我们可能会简单地失去最重要的任务。
3.5. Custom Policy
3.5.自定义政策
It’s also possible to provide a custom saturation policy just by implementing the RejectedExecutionHandler interface:
仅仅通过实现RejectedExecutionHandler接口,也可以提供一个自定义的饱和度策略。
class GrowPolicy implements RejectedExecutionHandler {
private final Lock lock = new ReentrantLock();
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
lock.lock();
try {
executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 1);
} finally {
lock.unlock();
}
executor.submit(r);
}
}
In this example, when the executor becomes saturated, we increment the max pool size by one and then re-submit the same task:
在这个例子中,当执行者变得饱和时,我们将最大池子的大小增加一个,然后重新提交同一个任务。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new ArrayBlockingQueue<>(2),
new GrowPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);
List<String> results = new ArrayList<>();
queue.drainTo(results);
assertThat(results).contains("First", "Second", "Third");
As expected, all four tasks are executed.
正如预期的那样,所有四个任务都被执行。
3.6. Shutdown
3.6.关机
In addition to overloaded executors, saturation policies also apply to all executors that have been shut down:
除了超载的执行器之外,饱和策略也适用于所有已关闭的执行器。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.shutdownNow();
assertThatThrownBy(() -> executor.execute(() -> {}))
.isInstanceOf(RejectedExecutionException.class);
The same is true for all executors that are in the middle of a shutdown:
对于所有处于关机状态的执行者来说也是如此:。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.execute(() -> waitFor(100));
executor.shutdown();
assertThatThrownBy(() -> executor.execute(() -> {}))
.isInstanceOf(RejectedExecutionException.class);
4. Conclusion
4.总结
In this tutorial, first, we had a reasonably quick refresher about thread pools in Java. Then, after introducing saturated executors, we learned how and when to apply different saturation policies.
在本教程中,首先,我们对Java中的线程池进行了合理的快速复习。然后,在介绍了饱和执行器后,我们了解了如何以及何时应用不同的饱和策略。
As usual, the sample code is available over on GitHub.
像往常一样,样本代码可在GitHub上获得。