A Guide to Concurrent Queues in Java – Java中的并发队列指南

最后修改: 2020年 8月 16日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll walk through some of the main implementations of concurrent queues in Java. For a general introduction to queues, refer to our Guide to the Java Queue Interface article.

在本教程中,我们将介绍 Java 中并发队列的一些主要实现。有关队列的一般介绍,请参考我们的Java Queue 接口指南文章。

2. Queues

2.尾巴

In multithreaded applications, queues need to handle multiple concurrent producers-consumers scenarios. The correct choice of a concurrent queue could be crucial in achieving good performance in our algorithms. 

在多线程应用中,队列需要处理多个并发的生产者-消费者情景。正确选择一个并发队列可能是我们的算法取得良好性能的关键。

Firstly, we’ll see some important differences between a blocking queue and a non-blocking one. Then, we’ll take a look at some implementations and best practices.

首先,我们将看到阻塞队列和非阻塞队列的一些重要区别。然后,我们将看一下一些实现和最佳实践。

2. Blocking vs Non-Blocking Queue

2.阻塞式与非阻塞式队列

BlockingQueue offers a simple thread-safe mechanism. In this queue, threads need to wait for the queue’s availability. The producers will wait for available capacity before adding elements, while consumers will wait until the queue is empty. In those cases, the non-blocking queue will either throw an exception or return a special value, like null or false.

BlockingQueue提供了一种简单的线程安全机制。在这个队列中,线程需要等待队列的可用性。生产者将在添加元素之前等待可用的容量,而消费者将等待,直到队列为空。在这些情况下,非阻塞队列将抛出一个异常或返回一个特殊值,如nullfalse

To achieve this blocking mechanism, the BlockingQueue interface exposes two functions on top of the normal Queue functions: put and take. Those functions are the equivalent of add and remove in a standard Queue.

为了实现这种阻塞机制,BlockingQueue接口在正常的Queue函数之上暴露了两个函数。puttake。这些函数等同于标准Queue中的addremove

3. Concurrent Queue Implementations

3.并发队列实现

3.1. ArrayBlockingQueue

3.1.ArrayBlockingQueue

As its name suggests, this queue uses an array internally. As a consequence, it’s a bounded queue, meaning it has a fixed size.

顾名思义,这个队列内部使用一个数组。因此,它是一个有界队列,意味着它有一个固定的大小

A simple work queue is an example use case. This scenario is often a low producer-to-consumer ratio, where we split time-consuming tasks among multiple workers. Since this queue can’t grow indefinitely, the size limit acts as a safety threshold if memory is an issue.

一个简单的工作队列就是一个用例。这种情况通常是生产者与消费者的比例较低,我们将耗时的任务分给多个工作者。由于这个队列不能无限地增长,如果内存是一个问题,大小限制就像一个安全阈值

Speaking of memory, it’s important to note that the queue pre-allocates the array. While this may improve throughput, it may also consume more memory than necessary. For instance, a large-capacity queue may stay empty for long periods of time.

说到内存,需要注意的是,队列会预先分配阵列。虽然这可能会提高吞吐量,但它可能也会消耗超过必要的内存。例如,一个大容量的队列可能会在很长一段时间内保持空白。

Also, the ArrayBlockingQueue uses a single lock for both put and take operations. This ensures no overwrite of entries, at the cost of a performance hit.

另外,ArrayBlockingQueueputtake操作使用一个锁。这确保了条目不会被覆盖,但代价是性能受到影响。

3.2. LinkedBlockingQueue

3.2.LinkedBlockingQueue

The LinkedBlockingQueue uses a LinkedList variant, where each queue item is a new node. While this makes the queue unbounded in principle, it still has a hard limit of Integer.MAX_VALUE.

LinkedBlockingQueue使用LinkedList变体,其中每个队列项目是一个新节点。虽然这使得队列在原则上是无界的,但它仍然有一个硬性限制Integer.MAX_VALUE

On the other hand, we can set the queue size by using the constructor LinkedBlockingQueue(int capacity).

另一方面,我们可以通过使用构造函数LinkedBlockingQueue(int capacity)来设置队列大小。

This queue uses distinct locks for put and take operations. As a consequence, both operations can be done in parallel and improve throughput.

这个队列对puttake操作使用不同的锁。因此,这两种操作可以并行进行,提高了吞吐量。

Since the LinkedBlockingQueue can be either bounded or unbounded, why would we use the ArrayBlockingQueue over this one? LinkedBlockingQueue needs to allocate and deallocate nodes every time an item is added or removed from the queue. For this reason, an ArrayBlockingQueue can be a better alternative if the queue grows fast and shrinks fast.

既然LinkedBlockingQueue可以是有界或无界的,为什么我们要使用ArrayBlockingQueue而不是这个?LinkedBlockingQueue每次从队列中添加或删除项目时都需要分配和删除节点。由于这个原因,如果队列快速增长和快速收缩,ArrayBlockingQueue可以是一个更好的选择。

The performance of LinkedBlockingQueue is said to be unpredictable. In other words, we always need to profile our scenarios to ensure we use the right data structure.

据说LinkedBlockingQueue的性能是不可预测的。换句话说,我们总是需要对我们的场景进行剖析,以确保我们使用正确的数据结构。

3.3. PriorityBlockingQueue

3.3.PriorityBlockingQueue

The PriorityBlockingQueue is our go-to solution when we need to consume items in a specific order. To achieve this, the PriorityBlockingQueue uses an array-based binary heap.

PriorityBlockingQueue是我们常用的解决方案当我们需要按特定顺序消费项目时。为了实现这一点,PriorityBlockingQueue使用了一个基于数组的二进制堆。

While internally it uses a single lock mechanism, the take operation can occur simultaneously with the put operation. The use of a simple spinlock makes this possible.

虽然在内部它使用一个单一的锁机制,但take操作可以与put操作同时发生。使用一个简单的自旋锁使这成为可能。

A typical use case is consuming tasks with different priorities. We don’t want a low priority task to take the place of a high priority one.

一个典型的用例是消耗具有不同优先级的任务。我们不希望一个低优先级的任务取代一个高优先级的任务

3.4. DelayQueue

3.4 DelayQueue

We use a DelayQueue when a consumer can only take an expired item. Interestingly, it uses a PriorityQueue internally to order the items by their expiration.

我们使用DelayQueue 当消费者只能接受一个过期的项目时。有趣的是,它在内部使用了一个PriorityQueue来按照过期时间来排列项目。

Since this is not a general-purpose queue, it doesn’t cover as many scenarios as the ArrayBlockingQueue or the LinkedBlockingQueue. For example, we can use this queue to implement a simple event loop similar to what is found in NodeJS. We place asynchronous tasks in the queue for later processing when they expire.

由于这不是一个通用的队列,它不能像ArrayBlockingQueueLinkedBlockingQueue那样涵盖很多场景。例如,我们可以使用这个队列来实现一个类似于NodeJS中的简单事件循环。我们将异步任务放在队列中,以便在它们过期后进行处理。

3.5. LinkedTransferQueue

3.5.LinkedTransferQueue

The LinkedTransferQueue introduces a transfer method. While other queues typically block when producing or consuming items, the LinkedTransferQueue allows a producer to wait for the consumption of an item.

LinkedTransferQueue引入了一个transfer方法。其他队列在生产或消费项目时通常会阻塞,而LinkedTransferQueue 允许生产者等待项目的消费

We use a LinkedTransferQueue when we need a guarantee that a particular item we put in the queue has been taken by someone. Also, we can implement a simple backpressure algorithm using this queue. Indeed, by blocking producers until consumption, consumers can drive the flow of messages produced.

当我们需要保证我们放在队列中的某个项目已经被别人拿走时,我们会使用LinkedTransferQueue。另外,我们可以使用这个队列实现一个简单的反压算法。事实上,通过阻断生产者直到消费,消费者可以驱动生产的消息流

3.6. SynchronousQueue

3.6 同步队列(SynchronousQueue

While queues typically contain many items, the SynchronousQueue will always have, at most, a single item. In other words, we need to see the SynchronousQueue as a simple way to exchange some data between two threads.

虽然队列通常包含许多项目,但SynchronousQueue最多会有一个项目。换句话说,我们需要将SynchronousQueue视为在两个线程之间交换一些数据的简单方法

When we have two threads that need access to a shared state, we often synchronize these with CountDownLatch or other synchronization mechanisms. By using a SynchronousQueue, we can avoid this manual synchronization of threads.

当我们有两个线程需要访问一个共享状态时,我们经常用CountDownLatch或其他同步机制来同步这些线程。通过使用SynchronousQueue,我们可以避免这种线程的手动同步

3.7. ConcurrentLinkedQueue

3.7.ConcurrentLinkedQueue

The ConcurrentLinkedQueue is the only non-blocking queue of this guide. Consequently, it provides a “wait-free” algorithm where add and poll are guaranteed to be thread-safe and return immediately. Instead of locks, this queue uses CAS (Compare-And-Swap).

ConcurrentLinkedQueue是本指南中唯一的非阻塞队列。因此,它提供了一种 “无等待 “算法,其中addpoll被保证为线程安全并立即返回。这个队列使用CAS(Compare-And-Swap),而不是锁。

Internally, it’s based on an algorithm from Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.

在内部,它是基于Maged M. Michael和Michael L. Scott的Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms中的算法。

It’s a perfect candidate for modern reactive systems, where using blocking data structures is often forbidden.

它是现代reactive系统的完美候选者,在这种情况下,使用阻塞式数据结构往往是被禁止的。

On the other hand, if our consumer ends up waiting in a loop, we should probably choose a blocking queue as a better alternative.

另一方面,如果我们的消费者最终要在一个循环中等待,我们也许应该选择一个阻塞队列作为更好的选择。

4. Conclusion

4.总结

In this guide, we walked through different concurrent queue implementations, discussing their strengths and weaknesses. With this in mind, we’re better equipped to develop efficient, durable, and available systems.

在本指南中,我们走过了不同的并发队列实现,讨论了它们的优势和劣势。考虑到这一点,我们就能更好地开发高效、持久、可用的系统。