Guide to the Java TransferQueue – Java TransferQueue指南

最后修改: 2017年 4月 26日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we’ll be looking at the TransferQueue construct from the standard java.util.concurrent package.

在这篇文章中,我们将关注TransferQueue从标准的java.util.concurrent包中构建的。

Simply put, this queue allows us to create programs according to the producer-consumer pattern, and coordinate messages passing from producers to consumers.

简单地说,这个队列允许我们按照生产者-消费者模式创建程序,并协调从生产者到消费者的消息传递。

The implementation is actually similar to the BlockingQueuebut gives us the new ability to implement a form of backpressure. This means that, when the producer sends a message to the consumer using the transfer() method, the producer will stay blocked until the message is consumed.

该实现实际上与BlockingQueue相似–但为我们提供了实现一种反压的新能力。这意味着,当生产者使用transfer()方法向消费者发送消息时,生产者将保持阻塞状态,直到该消息被消费。

2. One Producer – Zero Consumers

2.一个生产者 – 零个消费者

Let’s test a transfer() method from the TransferQueue – the expected behavior is that the producer will be blocked until the consumer receives the message from the queue using a take() method.

让我们测试一下来自TransferQueuetransfer()方法–预期的行为是生产者将被阻塞,直到消费者使用take()方法从队列中收到消息。

To achieve that, we’ll create a program that has one producer but zero consumers. The first call of transfer() from the producer thread will block indefinitely, as we don’t have any consumers to fetch that element from the queue.

为了实现这一目标,我们将创建一个有一个生产者但没有消费者的程序。生产者线程对transfer()的第一次调用将无限期地阻塞,因为我们没有任何消费者可以从队列中获取该元素。

Let’s see how the Producer class looks like:

让我们看看Producer类是怎样的。

class Producer implements Runnable {
    private TransferQueue<String> transferQueue;
 
    private String name;
 
    private Integer numberOfMessagesToProduce;
 
    public AtomicInteger numberOfProducedMessages
      = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < numberOfMessagesToProduce; i++) {
            try {
                boolean added 
                  = transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
                if(added){
                    numberOfProducedMessages.incrementAndGet();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    // standard constructors
}

We are passing an instance of the TransferQueue to the constructor together with a name that we want to give our producer and the number of elements that should be transferred to the queue.

我们将一个TransferQueue的实例和一个我们想给生产者的名字以及应该被转移到队列中的元素数量传递给构造函数。

Note that we are using the tryTransfer() method, with a given timeout. We are waiting four seconds, and if a producer is not able to transfer the message within the given timeout, it returns false and moves on to the next message. The producer has a numberOfProducedMessages variable to keep track of how many messages were produced.

请注意,我们使用的是tryTransfer()方法,有一个给定的超时。我们在等待四秒,如果生产者在给定的超时时间内不能传输消息,它就会返回false,并转到下一个消息。生产者有一个numberOfProducedMessages变量,用来记录生产了多少条消息。

Next, let’s look at the Consumer class:

接下来,让我们看一下Consumer类。

class Consumer implements Runnable {
 
    private TransferQueue<String> transferQueue;
 
    private String name;
 
    private int numberOfMessagesToConsume;
 
    public AtomicInteger numberOfConsumedMessages
     = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < numberOfMessagesToConsume; i++) {
            try {
                String element = transferQueue.take();
                longProcessing(element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void longProcessing(String element)
      throws InterruptedException {
        numberOfConsumedMessages.incrementAndGet();
        Thread.sleep(500);
    }
    
    // standard constructors
}

It is similar to the producer, but we are receiving elements from the queue by using the take() method. We are also simulating some long running action by using the longProcessing() method in which we are incrementing the numberOfConsumedMessages variable that is a counter of the received messages.

它与生产者类似,但我们通过使用take()方法从队列中接收元素。我们还通过使用longProcessing()方法来模拟一些长期运行的动作,在该方法中,我们正在增加numberOfConsumedMessages变量,该变量是一个接收消息的计数器。

Now, let’s start our program with only one producer:

现在,让我们只用一个生产者来开始我们的程序。

@Test
public void whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout() 
  throws InterruptedException {
    // given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(2);
    Producer producer = new Producer(transferQueue, "1", 3);

    // when
    exService.execute(producer);

    // then
    exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer.numberOfProducedMessages.intValue(), 0);
}

We want to send three elements to the queue, but the producer is blocked on the first element, and there is no consumer to fetch that element from the queue. We are using the tryTransfer() method which will block until the message is consumed or the timeout is reached. After the timeout, it will return false to indicate the transfer has failed, and it will try to transfer the next one. This is the output from the previous example:

我们想向队列发送三个元素,但是生产者在第一个元素上被阻塞了,而且没有消费者从队列中获取该元素。我们正在使用tryTransfer() 方法,它将阻塞直到消息被消耗或达到超时。超时后,它将返回false,表示传输失败,它将尝试传输下一条。这就是前面例子的输出。

Producer: 1 is waiting to transfer...
can not add an element due to the timeout
Producer: 1 is waiting to transfer...

3. One Producer – One Consumer

3.一个生产者–一个消费者

Let’s test a situation when there are one producer and one consumer:

让我们测试一下有一个生产者和一个消费者的情况。

@Test
public void whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages() 
  throws InterruptedException {
    // given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(2);
    Producer producer = new Producer(transferQueue, "1", 3);
    Consumer consumer = new Consumer(transferQueue, "1", 3);

    // when
    exService.execute(producer);
    exService.execute(consumer);

    // then
    exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer.numberOfProducedMessages.intValue(), 3);
    assertEquals(consumer.numberOfConsumedMessages.intValue(), 3);
}

The TransferQueue is used as an exchange point, and until the consumer consumes an element from the queue, the producer cannot proceed with adding another element to it. Let’s look at the program output:

TransferQueue被用作交换点,在消费者从队列中消耗一个元素之前,生产者不能继续向其添加另一个元素。我们来看看程序的输出。

Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 1 received element: A2

We see that producing and consuming elements from the queue is sequential because of the specification of TransferQueue.

我们看到,由于TransferQueue.的规范,从队列中生产和消费元素是有顺序的。

4. Many Producers – Many Consumers

4.许多生产者 – 许多消费者

In the last example we will consider having multiple consumers and multiple producers:

在最后一个例子中,我们将考虑有多个消费者和多个生产者。

@Test
public void whenMultipleConsumersAndProducers_thenProcessAllMessages() 
  throws InterruptedException {
    // given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(3);
    Producer producer1 = new Producer(transferQueue, "1", 3);
    Producer producer2 = new Producer(transferQueue, "2", 3);
    Consumer consumer1 = new Consumer(transferQueue, "1", 3);
    Consumer consumer2 = new Consumer(transferQueue, "2", 3);

    // when
    exService.execute(producer1);
    exService.execute(producer2);
    exService.execute(consumer1);
    exService.execute(consumer2);

    // then
    exService.awaitTermination(10_000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer1.numberOfProducedMessages.intValue(), 3);
    assertEquals(producer2.numberOfProducedMessages.intValue(), 3);
}

In this example, we have two consumers and two producers. When the program starts, we see that both producers can produce one element and after that, they will block until one of the consumers takes that element from the queue:

在这个例子中,我们有两个消费者和两个生产者。当程序开始时,我们看到两个生产者可以生产一个元素,之后,他们将阻塞,直到其中一个消费者从队列中获取该元素。

Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 2 is waiting to transfer...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 2 transferred element: A0
Producer: 2 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A1
Producer: 2 is waiting to transfer...
Consumer: 2 received element: A1
Consumer: 2 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 2 received element: A2
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A2
Consumer: 2 received element: A2

5. Conclusion

5.结论

In this article, we were looking at the TransferQueue construct from the java.util.concurrent package.

在这篇文章中,我们正在研究来自java.util.concurrent包的TransferQueue结构。

We saw how to implement the producer-consumer program using that construct. We used a transfer() method to create a form of backpressure, where a producer can not publish another element until the consumer retrieves an element from the queue.

我们看到了如何使用该结构来实现生产者-消费者程序。我们使用transfer()方法来创建一种背压形式,即生产者不能发布另一个元素,直到消费者从队列中检索到一个元素。

The TransferQueue can be very useful when we do not want an over-producing producer that will flood the queue with messages, resulting in the OutOfMemory errors. In such design, the consumer will be dictating the speed at which the producer will produce messages.

当我们不希望生产者过度生产,导致消息淹没队列,造成OutOfMemory错误时,TransferQueue就会非常有用。在这种设计中,消费者将决定生产者生产消息的速度。

All these examples and code snippets can be found over on GitHub – this is a Maven project, so it should be easy to import and run as it is.

所有这些例子和代码片段都可以在GitHub上找到over–这是一个Maven项目,所以应该很容易导入并按原样运行。