ExecutorService – Waiting for Threads to Finish – ExecutorService – 等待线程结束

最后修改: 2017年 12月 26日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

The ExecutorService framework makes it easy to process tasks in multiple threads. We’re going to exemplify some scenarios in which we wait for threads to finish their execution.

ExecutorService框架使得在多线程中处理任务变得容易。我们将举例说明一些等待线程完成执行的场景。

Also, we’ll show how to gracefully shutdown an ExecutorService and wait for already running threads to finish their execution.

此外,我们将展示如何优雅地关闭ExecutorService,并等待已经运行的线程完成其执行。

2. After Executor’s Shutdown

2.在执行者的关闭之后

When using an Executor, we can shut it down by calling the shutdown() or shutdownNow() methods. Although, it won’t wait until all threads stop executing.

当使用一个执行器时,我们可以通过调用shutdown()shutdownNow()方法关闭它。尽管如此,它不会等到所有线程都停止执行的时候。

Waiting for existing threads to complete their execution can be achieved by using the awaitTermination() method.

等待现有线程完成其执行可以通过使用awaitTermination() 方法来实现。

This blocks the thread until all tasks complete their execution or the specified timeout is reached:

这将阻塞线程,直到所有任务完成执行或达到指定的超时。

public void awaitTerminationAfterShutdown(ExecutorService threadPool) {
    threadPool.shutdown();
    try {
        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
            threadPool.shutdownNow();
        }
    } catch (InterruptedException ex) {
        threadPool.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

3. Using CountDownLatch

3.使用CountDownLatch

Next, let’s look at another approach to solving this problem – using a CountDownLatch to signal the completion of a task.

接下来,让我们看看解决这个问题的另一种方法–使用CountDownLatch来提示任务的完成。

We can initialize it with a value that represents the number of times it can be decremented before all threads, that have called the await() method, are notified.

我们可以用一个值来初始化它,这个值代表在所有调用await()方法的线程被通知之前,它可以被递减的次数。

For example, if we need the current thread to wait for another N threads to finish their execution, we can initialize the latch using N:

例如,如果我们需要当前线程等待另一个N线程完成其执行,我们可以用N来初始化锁存器。

ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // ...
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

4. Using invokeAll()

4.使用 invokeAll()

The first approach that we can use to run threads is the invokeAll() method. The method returns a list of Future objects after all tasks finish or the timeout expires.

我们可以用来运行线程的第一个方法是invokeAll()方法。该方法在所有任务完成或超时结束后返回一个Future对象的列表

Also, we must note that the order of the returned Future objects is the same as the list of the provided Callable objects:

另外,我们必须注意,返回的Future对象的顺序与提供的Callable对象的列表相同。

ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(10);

List<Callable<String>> callables = Arrays.asList(
  new DelayedCallable("fast thread", 100), 
  new DelayedCallable("slow thread", 3000));

long startProcessingTime = System.currentTimeMillis();
List<Future<String>> futures = WORKER_THREAD_POOL.invokeAll(callables);

awaitTerminationAfterShutdown(WORKER_THREAD_POOL);

long totalProcessingTime = System.currentTimeMillis() - startProcessingTime;
 
assertTrue(totalProcessingTime >= 3000);

String firstThreadResponse = futures.get(0).get();
 
assertTrue("fast thread".equals(firstThreadResponse));

String secondThreadResponse = futures.get(1).get();
assertTrue("slow thread".equals(secondThreadResponse));

5. Using ExecutorCompletionService

5.使用ExecutorCompletionService

Another approach to running multiple threads is by using ExecutorCompletionService. It uses a supplied ExecutorService to execute tasks.

另一种运行多线程的方法是使用ExecutorCompletionService。它使用一个提供的ExecutorService来执行任务。

One difference over invokeAll() is the order in which the Futures, representing the executed tasks are returned. ExecutorCompletionService uses a queue to store the results in the order they are finished, while invokeAll() returns a list having the same sequential order as produced by the iterator for the given task list:

invokeAll()不同的是,代表已执行任务的Futures的返回顺序。ExecutorCompletionService使用一个队列来按完成的顺序存储结果,而invokeAll()返回一个列表,其顺序与给定任务列表的迭代器产生的顺序相同。

CompletionService<String> service
  = new ExecutorCompletionService<>(WORKER_THREAD_POOL);

List<Callable<String>> callables = Arrays.asList(
  new DelayedCallable("fast thread", 100), 
  new DelayedCallable("slow thread", 3000));

for (Callable<String> callable : callables) {
    service.submit(callable);
}

The results can be accessed using the take() method:

可以使用take()方法访问结果。

long startProcessingTime = System.currentTimeMillis();

Future<String> future = service.take();
String firstThreadResponse = future.get();
long totalProcessingTime
  = System.currentTimeMillis() - startProcessingTime;

assertTrue("First response should be from the fast thread", 
  "fast thread".equals(firstThreadResponse));
assertTrue(totalProcessingTime >= 100
  && totalProcessingTime < 1000);
LOG.debug("Thread finished after: " + totalProcessingTime
  + " milliseconds");

future = service.take();
String secondThreadResponse = future.get();
totalProcessingTime
  = System.currentTimeMillis() - startProcessingTime;

assertTrue(
  "Last response should be from the slow thread", 
  "slow thread".equals(secondThreadResponse));
assertTrue(
  totalProcessingTime >= 3000
  && totalProcessingTime < 4000);
LOG.debug("Thread finished after: " + totalProcessingTime
  + " milliseconds");

awaitTerminationAfterShutdown(WORKER_THREAD_POOL);

6. Conclusion

6.结论

Depending on the use case, we have various options to wait for threads to finish their execution.

根据不同的用例,我们有不同的选择来等待线程完成其执行。

A CountDownLatch is useful when we need a mechanism to notify one or more threads that a set of operations performed by other threads has finished.

当我们需要一种机制来通知一个或多个线程由其他线程执行的一组操作已经完成时,一个CountDownLatch很有用。

ExecutorCompletionService is useful when we need to access the task result as soon as possible and other approaches when we want to wait for all of the running tasks to finish.

ExecutorCompletionService在我们需要尽快访问任务结果时很有用,而在我们想要等待所有正在运行的任务完成时,其他方法也很有用。

The source code for the article is available over on GitHub.

该文章的源代码可在GitHub上获得