CyclicBarrier in Java – Java中的循环屏障

最后修改: 2017年 6月 26日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

CyclicBarriers are synchronization constructs that were introduced with Java 5 as a part of the java.util.concurrent package.

CyclicBarriers是同步构造,作为java.util.concurrent包的一部分,从Java 5引入。

In this article, we’ll explore this implementation in a concurrency scenario.

在这篇文章中,我们将探讨这种在并发情况下的实现。

2. Java Concurrency – Synchronizers

2.Java并发性–同步器

The java.util.concurrent package contains several classes that help manage a set of threads that collaborate with each other. Some of these include:

java.util.concurrent包包含了几个有助于管理一组相互协作的线程的类。其中一些包括

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Exchanger
  • Semaphore
  • SynchronousQueue

These classes offer out of the box functionality for common interaction patterns between threads.

这些类为线程之间的常见交互模式提供了开箱即用的功能。

If we have a set of threads that communicate with each other and resemble one of the common patterns, we can simply reuse the appropriate library classes (also called Synchronizers) instead of trying to come up with a custom scheme using a set of locks and condition objects and the synchronized keyword.

如果我们有一组相互通信的线程,并且类似于常见的模式之一,我们可以简单地重复使用适当的库类(也称为Synchronizers),而不是试图使用一组锁和条件对象想出一个自定义方案synchronized关键字。

Let’s focus on the CyclicBarrier going forward.

让我们专注于CyclicBarrier前进。

3. CyclicBarrier

3.CyclicBarrier

A CyclicBarrier is a synchronizer that allows a set of threads to wait for each other to reach a common execution point, also called a barrier.

CyclicBarrier是一个同步器,它允许一组线程互相等待,以达到一个共同的执行点,也称为barrier

CyclicBarriers are used in programs in which we have a fixed number of threads that must wait for each other to reach a common point before continuing execution.

CyclicBarriers用于程序中,我们有固定数量的线程,这些线程必须在继续执行前等待对方到达一个共同点。

The barrier is called cyclic because it can be re-used after the waiting threads are released.

屏障被称为周期性,因为它可以在等待的线程被释放后被重新使用。

4. Usage

4.使用方法

The constructor for a CyclicBarrier is simple. It takes a single integer that denotes the number of threads that need to call the await() method on the barrier instance to signify reaching the common execution point:

CyclicBarrier的构造函数很简单。它需要一个整数,表示需要在屏障实例上调用await()方法的线程数,以表示达到共同执行点。

public CyclicBarrier(int parties)

The threads that need to synchronize their execution are also called parties and calling the await() method is how we can register that a certain thread has reached the barrier point.

需要同步执行的线程也被称为parties,而调用await()方法就是我们可以注册某个线程已经到达障碍点。

This call is synchronous and the thread calling this method suspends execution till a specified number of threads have called the same method on the barrier. This situation where the required number of threads have called await(), is called tripping the barrier.

这个调用是同步的,调用这个方法的线程会暂停执行,直到指定数量的线程在屏障上调用了同一个方法。这种所需数量的线程已经调用await()的情况,被称为tripping the barrier

Optionally, we can pass the second argument to the constructor, which is a Runnable instance. This has logic that would be run by the last thread that trips the barrier:

另外,我们可以将第二个参数传递给构造函数,这是一个Runnable实例。这里面的逻辑将由最后一个触发障碍的线程来运行。

public CyclicBarrier(int parties, Runnable barrierAction)

5. Implementation

5.实施

To see CyclicBarrier in action, let’s consider the following scenario:

为了看到CyclicBarrier的作用,让我们考虑以下情况。

There’s an operation that a fixed number of threads perform and store the corresponding results in a list. When all threads finish performing their action, one of them (typically the last one that trips the barrier) starts processing the data that was fetched by each of these.

有一个固定数量的线程执行的操作,并将相应的结果存储在一个列表中。当所有线程执行完它们的动作后,其中一个线程(通常是最后一个跳闸的线程)开始处理每个人取来的数据。

Let’s implement the main class where all the action happens:

让我们来实现所有行动发生的主类。

public class CyclicBarrierDemo {

    private CyclicBarrier cyclicBarrier;
    private List<List<Integer>> partialResults
     = Collections.synchronizedList(new ArrayList<>());
    private Random random = new Random();
    private int NUM_PARTIAL_RESULTS;
    private int NUM_WORKERS;

    // ...
}

This class is pretty straight forward – NUM_WORKERS is the number of threads that are going to execute and NUM_PARTIAL_RESULTS is the number of results that each of the worker threads is going to produce.

这个类非常直接–NUM_WORKERS是将要执行的线程数量,NUM_PARTIAL_RESULTS是每个工作线程将要产生的结果数量。

Finally, we have partialResults that are a list that’s going to store the results of each of these worker threads. Do note that this list is a SynchronizedList because multiple threads will be writing to it at the same time, and the add() method isn’t thread-safe on a plain ArrayList.

最后,我们有partialResults,它是一个列表,将存储这些工作线程的每一个结果。请注意,这个列表是一个SynchronizedList,因为多个线程将同时写入该列表,而add()方法在普通的ArrayList上并不是线程安全的。

Now let’s implement the logic of each of the worker threads:

现在让我们来实现每个工作线程的逻辑。

public class CyclicBarrierDemo {

    // ...

    class NumberCruncherThread implements Runnable {

        @Override
        public void run() {
            String thisThreadName = Thread.currentThread().getName();
            List<Integer> partialResult = new ArrayList<>();

            // Crunch some numbers and store the partial result
            for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) {    
                Integer num = random.nextInt(10);
                System.out.println(thisThreadName
                  + ": Crunching some numbers! Final result - " + num);
                partialResult.add(num);
            }

            partialResults.add(partialResult);
            try {
                System.out.println(thisThreadName 
                  + " waiting for others to reach barrier.");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                // ...
            } catch (BrokenBarrierException e) {
                // ...
            }
        }
    }

}

We’ll now implement the logic that runs when the barrier has been tripped.

我们现在要实现障碍物被绊倒时的运行逻辑。

To keep things simple, let’s just add all the numbers in the partial results list:

为了简单起见,我们就把部分结果列表中的所有数字加起来。

public class CyclicBarrierDemo {

    // ...
    
    class AggregatorThread implements Runnable {

        @Override
        public void run() {

            String thisThreadName = Thread.currentThread().getName();

            System.out.println(
              thisThreadName + ": Computing sum of " + NUM_WORKERS 
              + " workers, having " + NUM_PARTIAL_RESULTS + " results each.");
            int sum = 0;

            for (List<Integer> threadResult : partialResults) {
                System.out.print("Adding ");
                for (Integer partialResult : threadResult) {
                    System.out.print(partialResult+" ");
                    sum += partialResult;
                }
                System.out.println();
            }
            System.out.println(thisThreadName + ": Final result = " + sum);
        }
    }
}

The final step would be to construct the CyclicBarrier and kick things off with a main() method:

最后一步是构建CyclicBarrier,并以main()方法启动事情。

public class CyclicBarrierDemo {

    // Previous code
 
    public void runSimulation(int numWorkers, int numberOfPartialResults) {
        NUM_PARTIAL_RESULTS = numberOfPartialResults;
        NUM_WORKERS = numWorkers;

        cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread());

        System.out.println("Spawning " + NUM_WORKERS
          + " worker threads to compute "
          + NUM_PARTIAL_RESULTS + " partial results each");
 
        for (int i = 0; i < NUM_WORKERS; i++) {
            Thread worker = new Thread(new NumberCruncherThread());
            worker.setName("Thread " + i);
            worker.start();
        }
    }

    public static void main(String[] args) {
        CyclicBarrierDemo demo = new CyclicBarrierDemo();
        demo.runSimulation(5, 3);
    }
}

In the above code, we initialized the cyclic barrier with 5 threads that each produce 3 integers as a part of their computation and store the same in the resulting list.

在上面的代码中,我们用5个线程初始化了循环屏障,每个线程产生3个整数作为其计算的一部分,并将其存储在结果列表中。

Once the barrier is tripped, the last thread that tripped the barrier executes the logic specified in the AggregatorThread, namely – add all the numbers produced by the threads.

一旦障碍被跳过,最后一个跳过障碍的线程就会执行AggregatorThread中指定的逻辑,即–将所有线程产生的数字相加。

6. Results

6.结果

Here is the output from one execution of the above program – each execution might create different results as the threads can be spawned in a different order:

以下是上述程序一次执行的输出结果–每次执行都可能产生不同的结果,因为线程可以以不同的顺序生成。

Spawning 5 worker threads to compute 3 partial results each
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 2
Thread 0: Crunching some numbers! Final result - 2
Thread 0 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 2
Thread 1: Crunching some numbers! Final result - 0
Thread 1: Crunching some numbers! Final result - 5
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 6
Thread 3: Crunching some numbers! Final result - 4
Thread 3: Crunching some numbers! Final result - 0
Thread 3 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 1
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 9
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 5
Thread 4 waiting for others to reach barrier.
Thread 4: Computing final sum of 5 workers, having 3 results each.
Adding 6 2 2 
Adding 2 0 5 
Adding 6 4 0 
Adding 1 1 0 
Adding 9 3 5 
Thread 4: Final result = 46

As the above output shows, Thread 4 is the one that trips the barrier and also executes the final aggregation logic. It is also not necessary that threads are actually run in the order that they’re started as the above example shows.

正如上面的输出所显示的,线程4是绊倒障碍物的那个,同时也执行了最终的聚合逻辑。也没有必要像上面的例子所显示的那样,线程实际上是按照它们启动的顺序运行的。

7. Conclusion

7.结论

In this article, we saw what a CyclicBarrier is, and what kind of situations it is helpful in.

在这篇文章中,我们看到了什么是CyclicBarrier,以及它在什么样的情况下有帮助。

We also implemented a scenario where we needed a fixed number of threads to reach a fixed execution point, before continuing with other program logic.

我们还实现了一个场景,即我们需要固定数量的线程来达到一个固定的执行点,然后再继续进行其他的程序逻辑。

As always, the code for the tutorial can be found over on GitHub.

一如既往,该教程的代码可以在GitHub上找到over