Guide to CountDownLatch in Java – Java中的CountDownLatch指南

最后修改: 2017年 1月 26日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

In this article, we’ll give a guide to the CountDownLatch class and demonstrate how it can be used in a few practical examples.

在这篇文章中,我们将对CountDownLatch类进行指导,并演示如何在一些实际的例子中使用它。

Essentially, by using a CountDownLatch we can cause a thread to block until other threads have completed a given task.

从本质上讲,通过使用CountDownLatch,我们可以使一个线程阻塞,直到其他线程完成指定任务。

2. Usage in Concurrent Programming

2.在并发编程中的应用

Simply put, a CountDownLatch has a counter field, which you can decrement as we require. We can then use it to block a calling thread until it’s been counted down to zero.

简单地说,一个 CountDownLatch有一个counter字段,你可以按照我们的要求进行递减。然后我们可以用它来阻塞一个调用线程,直到它被倒数到零。

If we were doing some parallel processing, we could instantiate the CountDownLatch with the same value for the counter as a number of threads we want to work across. Then, we could just call countdown() after each thread finishes, guaranteeing that a dependent thread calling await() will block until the worker threads are finished.

如果我们正在进行一些并行处理,我们可以实例化CountDownLatch,并将计数器的值与我们希望工作的线程数量相同。然后,我们可以在每个线程完成后调用countdown() ,保证调用await() 的依赖线程会阻塞,直到工作线程完成。

3. Waiting for a Pool of Threads to Complete

3.等待线程池的完成

Let’s try out this pattern by creating a Worker and using a CountDownLatch field to signal when it has completed:

让我们通过创建一个Worker并使用CountDownLatch字段来发出完成信号来尝试一下这种模式。

public class Worker implements Runnable {
    private List<String> outputScraper;
    private CountDownLatch countDownLatch;

    public Worker(List<String> outputScraper, CountDownLatch countDownLatch) {
        this.outputScraper = outputScraper;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        doSomeWork();
        outputScraper.add("Counted down");
        countDownLatch.countDown();
    }
}

Then, let’s create a test in order to prove that we can get a CountDownLatch to wait for the Worker instances to complete:

然后,让我们创建一个测试,以证明我们可以让CountDownLatch等待Worker实例完成。

@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion()
  throws InterruptedException {

    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());

      workers.forEach(Thread::start);
      countDownLatch.await(); 
      outputScraper.add("Latch released");

      assertThat(outputScraper)
        .containsExactly(
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Latch released"
        );
    }

Naturally “Latch released” will always be the last output – as it’s dependant on the CountDownLatch releasing.

自然,”Latch released “将总是最后一个输出–因为它依赖于CountDownLatchreleasing。

Note that if we didn’t call await(), we wouldn’t be able to guarantee the ordering of the execution of the threads, so the test would randomly fail.

注意,如果我们不调用await(),我们就无法保证线程执行的顺序,所以测试会随机失败。

4. A Pool of Threads Waiting to Begin

4.等待开始的线程池

If we took the previous example, but this time started thousands of threads instead of five, it’s likely that many of the earlier ones will have finished processing before we have even called start() on the later ones. This could make it difficult to try and reproduce a concurrency problem, as we wouldn’t be able to get all our threads to run in parallel.

如果我们采用之前的例子,但这次启动了数千个线程,而不是五个,很可能在我们对后来的线程调用start()之前,许多早期的线程就已经完成了处理。这可能会使我们难以尝试重现并发问题,因为我们无法让所有的线程并行运行。

To get around this, let’s get the CountdownLatch to work differently than in the previous example. Instead of blocking a parent thread until some child threads have finished, we can block each child thread until all the others have started.

为了解决这个问题,让我们让CountdownLatch的工作方式与前面的例子不同。我们可以阻止父线程直到一些子线程完成,而不是阻止每个子线程直到所有其他线程开始。

Let’s modify our run() method so it blocks before processing:

让我们修改我们的run()方法,使它在处理之前就被阻止。

public class WaitingWorker implements Runnable {

    private List<String> outputScraper;
    private CountDownLatch readyThreadCounter;
    private CountDownLatch callingThreadBlocker;
    private CountDownLatch completedThreadCounter;

    public WaitingWorker(
      List<String> outputScraper,
      CountDownLatch readyThreadCounter,
      CountDownLatch callingThreadBlocker,
      CountDownLatch completedThreadCounter) {

        this.outputScraper = outputScraper;
        this.readyThreadCounter = readyThreadCounter;
        this.callingThreadBlocker = callingThreadBlocker;
        this.completedThreadCounter = completedThreadCounter;
    }

    @Override
    public void run() {
        readyThreadCounter.countDown();
        try {
            callingThreadBlocker.await();
            doSomeWork();
            outputScraper.add("Counted down");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            completedThreadCounter.countDown();
        }
    }
}

Now, let’s modify our test so it blocks until all the Workers have started, unblocks the Workers, and then blocks until the Workers have finished:

现在,让我们修改我们的测试,使它阻塞直到所有的工作程序开始,解除阻塞工作程序,然后阻塞直到工作程序完成。

@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime()
 throws InterruptedException {
 
    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch readyThreadCounter = new CountDownLatch(5);
    CountDownLatch callingThreadBlocker = new CountDownLatch(1);
    CountDownLatch completedThreadCounter = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new WaitingWorker(
        outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
      .limit(5)
      .collect(toList());

    workers.forEach(Thread::start);
    readyThreadCounter.await(); 
    outputScraper.add("Workers ready");
    callingThreadBlocker.countDown(); 
    completedThreadCounter.await(); 
    outputScraper.add("Workers complete");

    assertThat(outputScraper)
      .containsExactly(
        "Workers ready",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Workers complete"
      );
}

This pattern is really useful for trying to reproduce concurrency bugs, as can be used to force thousands of threads to try and perform some logic in parallel.

这种模式对于试图重现并发性错误真的很有用,因为可以用来强迫成千上万的线程尝试并行地执行一些逻辑。

5. Terminating a CountdownLatch Early

5.提前终止一个倒计时锁

Sometimes, we may run into a situation where the Workers terminate in error before counting down the CountDownLatch. This could result in it never reaching zero and await() never terminating:

有时,我们可能会遇到这样的情况:WorkersCountDownLatch的倒计时之前就错误地终止了。这可能导致它永远不会达到零,await()永远不会终止。

@Override
public void run() {
    if (true) {
        throw new RuntimeException("Oh dear, I'm a BrokenWorker");
    }
    countDownLatch.countDown();
    outputScraper.add("Counted down");
}

Let’s modify our earlier test to use a BrokenWorker, in order to show how await() will block forever:

让我们修改先前的测试,使用一个BrokenWorker,以展示await()如何永远阻塞。

@Test
public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck()
  throws InterruptedException {
 
    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());

    workers.forEach(Thread::start);
    countDownLatch.await();
}

Clearly, this is not the behavior we want – it would be much better for the application to continue than infinitely block.

显然,这不是我们想要的行为–对应用程序来说,继续下去比无限地阻塞要好得多。

To get around this, let’s add a timeout argument to our call to await().

为了解决这个问题,让我们在调用await()时添加一个超时参数。

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
assertThat(completed).isFalse();

As we can see, the test will eventually time out and await() will return false.

我们可以看到,测试最终会超时,await() 将返回false

6. Conclusion

6.结论

In this quick guide, we’ve demonstrated how we can use a CountDownLatch in order to block a thread until other threads have finished some processing.

在这个快速指南中,我们演示了如何使用CountDownLatch来阻塞一个线程,直到其他线程完成一些处理。

We’ve also shown how it can be used to help debug concurrency issues by making sure threads run in parallel.

我们还展示了如何通过确保线程并行运行来帮助调试并发性问题。

The implementation of these examples can be found over on GitHub; this is a Maven-based project, so should be easy to run as is.

这些例子的实现可以在GitHub上找到;这是一个基于Maven的项目,所以应该很容易按原样运行。