Guide to PriorityBlockingQueue in Java – Java中的PriorityBlockingQueue指南

最后修改: 2017年 2月 6日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

In this article, we’ll focus on the PriorityBlockingQueue class and go over some practical examples.

在这篇文章中,我们将重点讨论PriorityBlockingQueue类,并讨论一些实际的例子。

Starting with the assumption that we already know what a Queue is, we will first demonstrate how elements in the PriorityBlockingQueue are ordered by priority.

从假设我们已经知道什么是队列开始,我们将首先演示PriorityBlockingQueue中的元素如何按优先级排序

Following this, we will demonstrate how this type of queue can be used to block a thread.

在这之后,我们将演示如何用这种队列来阻塞线程。

Finally, we will show how using these two features together can be useful when processing data across multiple threads.

最后,我们将展示在跨多线程处理数据时,如何共同使用这两个功能。

2. Priority of Elements

2.要素的优先权

Unlike a standard queue, you can’t just add any type of element to a PriorityBlockingQueue. There are two options:

与标准队列不同,你不能随便向PriorityBlockingQueue>添加任何类型的元素。有两种选择。

  1. Adding elements which implement Comparable
  2. Adding elements which do not implement Comparable, on the condition that you provide a Comparator as well

By using either the Comparator or the Comparable implementations to compare elements, the PriorityBlockingQueue will always be sorted.

通过使用ComparatorComparable实现来比较元素,PriorityBlockingQueue将总是被排序的。

The aim is to implement comparison logic in a way in which the highest priority element is always ordered first. Then, when we remove an element from our queue, it will always be the one with the highest priority.

其目的是以一种方式实现比较逻辑,即最高优先级的元素总是先排序。然后,当我们从队列中移除一个元素时,它将总是具有最高优先级的那个。

To begin with, let’s make use of our queue in a single thread, as opposed to using it across multiple ones. By doing this, it makes it easy to prove how elements are ordered in a unit test:

首先,让我们在一个线程中使用我们的队列,而不是在多个线程中使用它。通过这样做,可以很容易地证明元素在单元测试中是如何排序的。

PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
ArrayList<Integer> polledElements = new ArrayList<>();
 
queue.add(1);
queue.add(5);
queue.add(2);
queue.add(3);
queue.add(4);

queue.drainTo(polledElements);

assertThat(polledElements).containsExactly(1, 2, 3, 4, 5);

As we can see, despite adding the elements to the queue in a random order, they will be ordered when we start polling them. This is because the Integer class implements Comparable, which will, in turn, be used to make sure we take them out from the queue in ascending order.

正如我们所看到的,尽管以随机的顺序将元素添加到队列中,但是当我们开始轮询它们时,它们将被排序。这是因为Integer类实现了Comparable,它将反过来被用来确保我们以升序从队列中取出它们。

It’s also worth noting that when two elements are compared and are the same, there’s no guarantee of how they will be ordered.

同样值得注意的是,当两个元素被比较且相同时,并不能保证它们将被如何排序。

3. Using the Queue to Block

3.使用队列来阻止

If we were dealing with a standard queue, we would call poll() to retrieve elements. However, if the queue was empty, a call to poll() would return null.

如果我们处理的是一个标准队列,我们将调用poll() 来检索元素。然而,如果队列是空的,对poll()的调用将返回null.

The PriorityBlockingQueue implements the BlockingQueue interface, which gives us some extra methods that allow us to block when removing from an empty queue. Let’s try using the take() method, which should do exactly that:

PriorityBlockingQueue实现了BlockingQueue接口,它给了我们一些额外的方法,允许我们在从空队列中移除时进行阻塞。让我们尝试使用take()方法,它应该正是这样做的。

PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

new Thread(() -> {
  System.out.println("Polling...");

  try {
      Integer poll = queue.take();
      System.out.println("Polled: " + poll);
  } catch (InterruptedException e) {
      e.printStackTrace();
  }
}).start();

Thread.sleep(TimeUnit.SECONDS.toMillis(5));
System.out.println("Adding to queue");
queue.add(1);

Although using sleep() is a slightly brittle way of demonstrating things, when we run this code we will see:

尽管使用sleep()是一种略显脆弱的演示方式,但当我们运行这段代码时,我们将看到。

Polling...
Adding to queue
Polled: 1

This proves that take() blocked until an item was added:

这证明了take()在一个项目被添加之前是封锁的。

  1. The thread will print “Polling” to prove that it’s started
  2. The test will then pause for around five seconds, to prove the thread must have called take() by this point
  3. We add to the queue, and should more or less instantly see “Polled: 1” to prove that take() returned an element as soon as it become available

It’s also worth mentioning that the BlockingQueue interface also provides us with ways of blocking when adding to full queues.

还值得一提的是,BlockingQueue接口也为我们提供了在添加到完整队列时进行阻塞的方法。

However, a PriorityBlockingQueue is unbounded. This means that it will never be full, thus it will always possible to add new elements.

然而,一个PriorityBlockingQueue是无界的。这意味着它永远不会满,因此总是有可能添加新的元素。

4. Using Blocking and Prioritization Together

4.同时使用阻断和优先次序

Now that we’ve explained the two key concepts of a PriorityBlockingQueue, let’s use them both together. We can simply expand on our previous example, but this time add more elements to the queue:

现在我们已经解释了PriorityBlockingQueue的两个关键概念,让我们一起使用它们。我们可以简单地扩展我们之前的例子,但这次要在队列中添加更多的元素。

Thread thread = new Thread(() -> {
    System.out.println("Polling...");
    while (true) {
        try {
            Integer poll = queue.take();
            System.out.println("Polled: " + poll);
        } 
        catch (InterruptedException e) { 
            e.printStackTrace();
        }
    }
});

thread.start();

Thread.sleep(TimeUnit.SECONDS.toMillis(5));
System.out.println("Adding to queue");

queue.addAll(newArrayList(1, 5, 6, 1, 2, 6, 7));
Thread.sleep(TimeUnit.SECONDS.toMillis(1));

Again, while this is a little brittle because of the use of sleep(), it still shows us a valid use case. We now have a queue which blocks, waiting for elements to be added. We’re then adding lots of elements at once, and then showing that they will be handled in priority order. The output will look like this:

同样,由于使用了sleep(),这个例子有点脆,它仍然向我们展示了一个有效的用例。我们现在有一个队列,它阻塞了,等待元素被添加。然后我们一次添加很多元素,然后显示它们将按照优先级顺序被处理。输出将看起来像这样。

Polling...
Adding to queue
Polled: 1
Polled: 1
Polled: 2
Polled: 5
Polled: 6
Polled: 6
Polled: 7

5. Conclusion

5.结论

In this guide, we’ve demonstrated how we can use a PriorityBlockingQueue in order to block a thread until some items have been added to it, and also that we are able to process those items based on their priority.

在本指南中,我们已经演示了如何使用PriorityBlockingQueue来阻塞一个线程,直到一些项目被添加到其中,同时我们也能够根据这些项目的优先级来处理这些项目。

The implementation of these examples can be found over on GitHub. This is a Maven-based project, so should be easy to run as is.

这些例子的实现可以在GitHub上找到over。这是一个基于Maven的项目,所以应该很容易按原样运行。