A Guide to Java SynchronousQueue – Java SynchronousQueue指南

最后修改: 2017年 4月 24日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we’ll be looking at the SynchronousQueue from the java.util.concurrent package.

在这篇文章中,我们将关注SynchronousQueue,它来自java.util.concurrent包。

Simply put, this implementation allows us to exchange information between threads in a thread-safe manner.

简单地说,这个实现允许我们以线程安全的方式在线程之间交换信息。

2. API Overview

2.API概述

The SynchronousQueue only has two supported operations: take() and put(), and both of them are blocking.

SynchronousQueue仅有两个支持的操作。take() put(),而且这两个操作都是阻塞的

For example, when we want to add an element to the queue, we need to call the put() method. That method will block until some other thread calls the take() method, signaling that it is ready to take an element.

例如,当我们想向队列添加一个元素时,我们需要调用put() 方法。该方法将阻塞,直到其他线程调用take()方法,示意它准备好接受一个元素。

Although the SynchronousQueue has an interface of a queue, we should think about it as an exchange point for a single element between two threads, in which one thread is handing off an element, and another thread is taking that element.

尽管SynchronousQueue有一个队列的接口,但我们应该把它看作是两个线程之间的一个单一元素的交换点,其中一个线程正在交出一个元素,而另一个线程正在接受该元素。

3. Implementing Handoffs Using a Shared Variable

3.使用共享变量实现交接

To see why the SynchronousQueue can be so useful, we will implement a logic using a shared variable between two threads and next, we will rewrite that logic using SynchronousQueue making our code a lot simpler and more readable.

为了了解为什么SynchronousQueue可以如此有用,我们将使用两个线程之间的共享变量实现一个逻辑,接下来,我们将使用SynchronousQueue重写该逻辑,使我们的代码更简单,更易读。

Let’s say that we have two threads – a producer and a consumer – and when the producer is setting a value of a shared variable, we want to signal that fact to the consumer thread. Next, the consumer thread will fetch a value from a shared variable.

假设我们有两个线程–一个生产者和一个消费者–当生产者在设置一个共享变量的值时,我们要向消费者线程发出这个事实。接下来,消费者线程将从一个共享变量中获取一个值。

We will use the CountDownLatch to coordinate those two threads, to prevent a situation when the consumer accesses a value of a shared variable that was not set yet.

我们将使用CountDownLatch来协调这两个线程,以防止出现消费者访问一个尚未设置的共享变量的值的情况。

We will define a sharedState variable and a CountDownLatch that will be used for coordinating processing:

我们将定义一个sharedState变量和一个CountDownLatch,它将被用于协调处理。

ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);

The producer will save a random integer to the sharedState variable, and execute the countDown() method on the countDownLatch, signaling to the consumer that it can fetch a value from the sharedState:

生产者将保存一个随机的整数到sharedState变量中,并在countDownLatch上执行countDown()方法,向消费者发出信号,它可以从sharedState中获取一个值。

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();
};

The consumer will wait on the countDownLatch using the await() method. When the producer signals that the variable was set, the consumer will fetch it from the sharedState:

消费者将使用await()方法来等待countDownLatch。当生产者发出信号说变量被设置了,消费者将从sharedState:中获取它。

Runnable consumer = () -> {
    try {
        countDownLatch.await();
        Integer consumedElement = sharedState.get();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

Last but not least, let’s start our program:

最后但同样重要的是,让我们开始我们的计划。

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(countDownLatch.getCount(), 0);

It will produce the following output:

它将产生以下输出。

Saving an element: -1507375353 to the exchange point
consumed an element: -1507375353 from the exchange point

We can see that this is a lot of code to implement such a simple functionality as exchanging an element between two threads. In the next section, we will try to make it better.

我们可以看到,为了实现在两个线程之间交换一个元素这样一个简单的功能,这是一个很大的代码。在下一节中,我们将尝试让它变得更好。

4. Implementing Handoffs Using the SynchronousQueue

4.使用SynchronousQueue实现交接

Let’s now implement the same functionality as in the previous section, but with a SynchronousQueue. It has a double effect because we can use it for exchanging state between threads and for coordinating that action so that we don’t need to use anything besides SynchronousQueue.

现在让我们实现与上一节相同的功能,但要用一个SynchronousQueue.它有双重效果,因为我们可以用它来交换线程间的状态,并协调该动作,这样我们就不需要使用除SynchronousQueue.之外的任何东西了。

Firstly, we will define a queue:

首先,我们将定义一个队列。

ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

The producer will call a put() method that will block until some other thread takes an element from the queue:

生产者将调用一个put() 方法,该方法将阻塞,直到其他线程从队列中获取一个元素。

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

The consumer will simply retrieve that element using the take() method:

消费者将简单地使用take() 方法检索该元素。

Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

Next, we will start our program:

接下来,我们将启动我们的程序。

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);

It will produce the following output:

它将产生以下输出。

Saving an element: 339626897 to the exchange point
consumed an element: 339626897 from the exchange point

We can see that a SynchronousQueue is used as an exchange point between the threads, which is a lot better and more understandable than the previous example which used the shared state together with a CountDownLatch.

我们可以看到,一个SynchronousQueue被用作线程间的交换点,这比之前的例子要好得多,也更容易理解,因为之前的例子将共享状态与CountDownLatch一起使用。

5. Conclusion

5.结论

In this quick tutorial, we looked at the SynchronousQueue construct. We created a program that exchanges data between two threads using shared state, and then rewrote that program to leverage the SynchronousQueue construct. This serves as an exchange point that coordinates the producer and the consumer thread.

在这个快速教程中,我们研究了SynchronousQueue结构。我们创建了一个使用共享状态在两个线程之间交换数据的程序,然后重写了该程序以利用SynchronousQueue结构。这作为一个交换点,协调了生产者和消费者线程。

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

所有这些例子和代码片段的实现都可以在GitHub项目中找到–这是一个Maven项目,所以应该很容易导入并按原样运行。