Guide to the Java Phaser – Java Phaser指南

最后修改: 2017年 5月 10日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we will be looking at the Phaser construct from the java.util.concurrent package. It is a very similar construct to the CountDownLatch that allows us to coordinate execution of threads. In comparison to the CountDownLatch, it has some additional functionality.

在这篇文章中,我们将研究来自java.util.concurrent包的Phaser结构。它是一个与CountDownLatch非常相似的构造,允许我们协调线程的执行。与CountDownLatch相比,它有一些额外的功能。

The Phaser is a barrier on which the dynamic number of threads need to wait before continuing execution. In the CountDownLatch that number cannot be configured dynamically and needs to be supplied when we’re creating the instance.

Phaser是一个屏障,在继续执行之前,线程的动态数量需要等待。在CountDownLatch中,这个数字不能被动态配置,需要在我们创建实例时提供。

2. Phaser API

2.Phaser API

The Phaser allows us to build logic in which threads need to wait on the barrier before going to the next step of execution.

Phaser允许我们建立这样的逻辑:线程在进入下一步执行之前需要等待障碍物

We can coordinate multiple phases of execution, reusing a Phaser instance for each program phase. Each phase can have a different number of threads waiting for advancing to another phase. We’ll have a look at an example of using phases later on.

我们可以协调多个执行阶段,为每个程序阶段重复使用一个Phaser实例。每个阶段都可以有不同数量的线程,等待推进到另一个阶段。我们稍后会看一下使用阶段的例子。

To participate in the coordination, the thread needs to register() itself with the Phaser instance. Note that this only increases the number of registered parties, and we can’t check whether the current thread is registered – we’d have to subclass the implementation to supports this.

为了参与协调,线程需要在Phaser实例中注册() 自己。请注意,这只是增加了注册方的数量,我们无法检查当前线程是否已经注册–我们必须对实现进行子类化以支持这一点。

The thread signals that it arrived at the barrier by calling the arriveAndAwaitAdvance(), which is a blocking method. When the number of arrived parties is equal to the number of registered parties, the execution of the program will continue, and the phase number will increase. We can get the current phase number by calling the getPhase() method.

线程通过调用arriveAndAwaitAdvance()发出到达障碍的信号,这是一个阻塞的方法。当到达方的数量与注册方的数量相等时,程序的执行将继续进行,阶段号将增加。我们可以通过调用getPhase()方法获得当前的阶段数。

When the thread finishes its job, we should call the arriveAndDeregister() method to signal that the current thread should no longer be accounted for in this particular phase.

当线程完成它的工作时,我们应该调用arriveAndDeregister()方法,以示当前线程在这个特定阶段不应该再被计入。

3. Implementing Logic Using Phaser API

3.使用PhaserAPI实现逻辑

Let’s say that we want to coordinate multiple phases of actions. Three threads will process the first phase, and two threads will process the second phase.

比方说,我们要协调多个阶段的行动。三个线程将处理第一个阶段,两个线程将处理第二个阶段。

We’ll create a LongRunningAction class that implements the Runnable interface:

我们将创建一个LongRunningAction类,它实现了Runnable接口。

class LongRunningAction implements Runnable {
    private String threadName;
    private Phaser ph;

    LongRunningAction(String threadName, Phaser ph) {
        this.threadName = threadName;
        this.ph = ph;
        ph.register();
    }

    @Override
    public void run() {
        ph.arriveAndAwaitAdvance();
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ph.arriveAndDeregister();
    }
}

When our action class is instantiated, we’re registering to the Phaser instance using the register() method. This will increment the number of threads using that specific Phaser.

当我们的行动类被实例化时,我们使用register() 方法向Phaser 实例注册。这将增加使用该特定Phaser的线程的数量。

The call to the arriveAndAwaitAdvance() will cause the current thread to wait on the barrier. As already mentioned, when the number of arrived parties becomes the same as the number of registered parties, the execution will continue.

arriveAndAwaitAdvance()的调用将导致当前线程在屏障上等待。如前所述,当到达的当事方数量与注册的当事方数量相同时,将继续执行。

After the processing is done, the current thread is deregistering itself by calling the arriveAndDeregister() method.

处理完成后,当前线程通过调用arriveAndDeregister()方法来取消自己的注册。

Let’s create a test case in which we will start three LongRunningAction threads and block on the barrier. Next, after the action is finished, we will create two additional LongRunningAction threads that will perform processing of the next phase.

让我们创建一个测试案例,其中我们将启动三个LongRunningAction线程,并在屏障上进行阻塞。接下来,在动作完成后,我们将创建另外两个LongRunningAction线程,它们将执行下一阶段的处理。

When creating Phaser instance from the main thread, we’re passing 1 as an argument. This is equivalent to calling the register() method from the current thread. We’re doing this because, when we’re creating three worker threads, the main thread is a coordinator, and therefore the Phaser needs to have four threads registered to it:

当从主线程创建Phaserinstance时,我们要传递1作为参数。这相当于从当前线程调用register() 方法。我们这样做是因为,当我们创建三个工作线程时,主线程是一个协调者,因此,Phaser需要有四个线程注册到它。

ExecutorService executorService = Executors.newCachedThreadPool();
Phaser ph = new Phaser(1);
 
assertEquals(0, ph.getPhase());

The phase after the initialization is equal to zero.

初始化后的相位等于零。

The Phaser class has a constructor in which we can pass a parent instance to it. It is useful in cases where we have large numbers of parties that would experience massive synchronization contention costs. In such situations, instances of Phasers may be set up so that groups of sub-phasers share a common parent.

Phaser类有一个构造函数,我们可以向它传递一个父实例。在我们有大量的当事人,会经历大量的同步争夺成本的情况下,它是非常有用的。在这种情况下,Phasers的实例可以被设置为使子相位组共享一个共同的父相位。

Next, let’s start three LongRunningAction action threads, which will be waiting on the barrier until we will call the arriveAndAwaitAdvance() method from the main thread.

接下来,让我们启动三个LongRunningAction行动线程,它们将在屏障上等待,直到我们将从主线程调用arriveAndAwaitAdvance()方法。

Keep in mind we’ve initialized our Phaser with 1 and called register() three more times. Now, three action threads have announced that they’ve arrived at the barrier, so one more call of arriveAndAwaitAdvance() is needed – the one from the main thread:

请记住,我们已经用1初始化了我们的Phaser,又调用了register()三次。现在,三个行动线程已经宣布他们已经到达了障碍物,所以还需要再调用一次arriveAndAwaitAdvance()–来自主线程的那个。

executorService.submit(new LongRunningAction("thread-1", ph));
executorService.submit(new LongRunningAction("thread-2", ph));
executorService.submit(new LongRunningAction("thread-3", ph));

ph.arriveAndAwaitAdvance();
 
assertEquals(1, ph.getPhase());

After the completion of that phase, the getPhase() method will return one because the program finished processing the first step of execution.

在该阶段完成后,getPhase()方法将返回1,因为程序完成了执行的第一步的处理。

Let’s say that two threads should conduct the next phase of processing. We can leverage Phaser to achieve that because it allows us to configure dynamically the number of threads that should wait on the barrier. We’re starting two new threads, but these will not proceed to execute until the call to the arriveAndAwaitAdvance() from the main thread (same as in the previous case):

比方说,两个线程应该进行下一阶段的处理。我们可以利用Phaser来实现这一点,因为它允许我们动态地配置应该在屏障上等待的线程数量。我们要启动两个新的线程,但这些线程在主线程调用arriveAndAwaitAdvance()之前不会继续执行(与之前的情况相同)。

executorService.submit(new LongRunningAction("thread-4", ph));
executorService.submit(new LongRunningAction("thread-5", ph));
ph.arriveAndAwaitAdvance();
 
assertEquals(2, ph.getPhase());

ph.arriveAndDeregister();

After this, the getPhase() method will return phase number equal to two. When we want to finish our program, we need to call the arriveAndDeregister() method as the main thread is still registered in the Phaser. When the deregistration causes the number of registered parties to become zero, the Phaser is terminated. All calls to synchronization methods will not block anymore and will return immediately.

在这之后,getPhase() 方法将返回等于2的阶段数。当我们想完成我们的程序时,我们需要调用arriveAndDeregister()方法,因为主线程仍然在Phaser中注册。当取消注册导致注册方的数量变为零时,Phaser就会结束。所有对同步方法的调用将不再阻塞,并将立即返回。

Running the program will produce the following output (full source code with the print line statements can be found in the code repository):

运行该程序将产生以下输出(带有打印行语句的完整源代码可以在代码库中找到)。

This is phase 0
This is phase 0
This is phase 0
Thread thread-2 before long running action
Thread thread-1 before long running action
Thread thread-3 before long running action
This is phase 1
This is phase 1
Thread thread-4 before long running action
Thread thread-5 before long running action

We see that all threads are waiting for execution until the barrier opens. Next phase of the execution is performed only when the previous one finished successfully.

我们看到,所有线程都在等待执行,直到屏障打开。只有在前一个阶段成功完成后,才会进行下一个阶段的执行。

4. Conclusion

4.结论

In this tutorial, we had a look at the Phaser construct from java.util.concurrent and we implemented the coordination logic with multiple phases using Phaser class.

在本教程中,我们从java.util.concurrent中查看了Phaser结构,并使用Phaser类实现了多阶段的协调逻辑。

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

所有这些例子和代码片段的实现都可以在GitHub项目中找到–这是一个Maven项目,所以应该很容易导入并按原样运行。