Implementing a Ring Buffer in Java – 在Java中实现一个环形缓冲器

最后修改: 2020年 6月 27日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll learn how to implement a Ring Buffer in Java.

在本教程中,我们将学习如何在Java中实现一个环形缓冲器。

2. Ring Buffer

2.环形缓冲器

Ring Buffer (or Circular Buffer) is a bounded circular data structure that is used for buffering data between two or more threads. As we keep writing to a ring buffer, it wraps around as it reaches the end.

环形缓冲区(或圆形缓冲区)是一个有界的圆形数据结构,用于在两个或多个线程之间缓冲数据。当我们不断向环形缓冲区写入数据时,当它到达终点时就会被包裹起来。

2.1. How It Works

2.1.它是如何工作的

A Ring Buffer is implemented using a fixed-size array that wraps around at the boundaries.

环形缓冲区是用一个固定大小的数组来实现的,该数组在边界处环绕

Apart from the array, it keeps track of three things:

除了阵列之外,它还记录了三件事。

  • the next available slot in the buffer to insert an element,
  • the next unread element in the buffer,
  • and the end of the array – the point at which the buffer wraps around to the start of the array

The mechanics of how a ring buffer handles these requirements vary with the implementation. For instance, the Wikipedia entry on the subject shows a method using four-pointers.

环形缓冲器如何处理这些要求的机制因实现方式而异。例如,Wikipedia条目就显示了一种使用四点式的方法。

We’ll borrow the approach from Disruptor‘s implementation of the ring buffer using sequences.

我们将借用Disruptor使用序列实现环形缓冲器的方法。

The first thing we need to know is the capacity – the fixed maximum size of the buffer. Next, we’ll use two monotonically increasing sequences:

我们需要知道的第一件事是容量–缓冲区的固定最大尺寸。接下来,我们将使用两个单调增长的序列

  • Write Sequence: starting at -1, increments by 1 as we insert an element
  • Read Sequence: starting at 0, increments by 1 as we consume an element

We can map a sequence to an index in the array by using a mod operation:

我们可以通过使用mod操作将一个序列映射到数组中的一个索引。

arrayIndex = sequence % capacity

The mod operation wraps the sequence around the boundaries to derive a slot in the buffer:

mod操作将序列包裹在边界周围,在缓冲区中衍生出一个槽

Let’s see how we’d insert an element:

让我们看看我们如何插入一个元素。

buffer[++writeSequence % capacity] = element

We are pre-incrementing the sequence before inserting an element.

我们在插入一个元素之前对序列进行预增。

To consume an element we do a post-increment:

为了消耗一个元素,我们做了一个后置增量。

element = buffer[readSequence++ % capacity]

In this case, we perform a post-increment on the sequence. Consuming an element doesn’t remove it from the buffer – it just stays in the array until it’s overwritten.

在这种情况下,我们对序列进行后置增量。消耗一个元素并不会将其从缓冲区中移除–它只是停留在数组中,直到被覆盖

2.2. Empty and Full Buffers

2.2.空缓冲区和满缓冲区

As we wrap around the array, we will start overwriting the data in the buffer. If the buffer is full, we can choose to either overwrite the oldest data regardless of whether the reader has consumed it or prevent overwriting the data that has not been read.

当我们环绕数组时,我们将开始覆盖缓冲区中的数据。如果缓冲区已满,我们可以选择覆盖最古老的数据,而不管读者是否已经消耗了它,或者防止覆盖未被读取的数据

If the reader can afford to miss the intermediate or old values (for example, a stock price ticker), we can overwrite the data without waiting for it to be consumed. On the other hand, if the reader must consume all the values (like with e-commerce transactions), we should wait (block/busy-wait) until the buffer has a slot available.

如果读者可以承受错过中间值或旧值(例如,股票价格行情),我们可以覆盖数据而不需要等待它被消耗。另一方面,如果读者必须消耗所有的值(比如电子商务交易),我们应该等待(block/busy-wait),直到缓冲区有一个空位。

The buffer is full if the size of the buffer is equal to its capacity, where its size is equal to the number of unread elements:

如果缓冲区的大小等于其容量,则缓冲区已满,其中其大小等于未读元素的数量。

size = (writeSequence - readSequence) + 1
isFull = (size == capacity)

If the write sequence lags behind the read sequence, the buffer is empty:

如果写序列落后于读序列,则缓冲区为空

isEmpty = writeSequence < readSequence

The buffer returns a null value if it’s empty.

如果缓冲区为空,则返回一个null值。

2.2. Advantages and Disadvantages

2.2.优势和劣势

A ring buffer is an efficient FIFO buffer. It uses a fixed-size array that can be pre-allocated upfront and allows an efficient memory access pattern. All the buffer operations are constant time O(1), including consuming an element, as it doesn’t require a shifting of elements.

环形缓冲器是一种高效的FIFO缓冲器。它使用一个固定大小的数组,可以提前预分配,并允许有效的内存访问模式。所有的缓冲器操作都是恒定时间O(1),包括消耗一个元素,因为它不需要移位元素。

On the flip side, determining the correct size of the ring buffer is critical. For instance, the write operations may block for a long time if the buffer is under-sized and the reads are slow. We can use dynamic sizing, but it would require moving data around and we’ll miss out on most of the advantages discussed above.

反过来说,确定环形缓冲区的正确大小也很关键。例如,如果缓冲区的大小不足,读取速度慢,那么写操作可能会阻塞很长时间。我们可以使用动态大小,但这需要移动数据,而且我们会错过上面讨论的大部分优点。

3. Implementation in Java

3.用Java实现

Now that we understand how a ring buffer works, let’s proceed to implement it in Java.

现在我们了解了环形缓冲器的工作原理,让我们继续用Java来实现它。

3.1. Initialization

3.1.初始化

First, let’s define a constructor that initializes the buffer with a predefined capacity:

首先,让我们定义一个构造函数,用一个预定义的容量来初始化缓冲区。

public CircularBuffer(int capacity) {
    this.capacity = (capacity < 1) ? DEFAULT_CAPACITY : capacity;
    this.data = (E[]) new Object[this.capacity];
    this.readSequence = 0;
    this.writeSequence = -1;
}

This will create an empty buffer and initialize the sequence fields as discussed in the previous section.

这将创建一个空的缓冲区,并如上一节所讨论的那样初始化序列字段。

3.2. Offer

3.2 提供

Next, we’ll implement the offer operation that inserts an element into the buffer at the next available slot and returns true on success. It returns false if the buffer can’t find an empty slot, that is, we can’t overwrite unread values.

接下来,我们将实现offer操作,将一个元素插入到缓冲区的下一个空位,成功后返回true。如果缓冲区找不到空槽,则返回false,也就是说,我们不能覆盖未读值

Let’s implement the offer method in Java:

让我们在Java中实现offer方法。

public boolean offer(E element) {
    boolean isFull = (writeSequence - readSequence) + 1 == capacity;
    if (!isFull) {
        int nextWriteSeq = writeSequence + 1;
        data[nextWriteSeq % capacity] = element;
        writeSequence++;
        return true;
    }
    return false;
}

So, we’re incrementing the write sequence and computing the index in the array for the next available slot. Then, we’re writing the data to the buffer and storing the updated write sequence.

所以,我们正在递增写序列,并计算数组中下一个可用槽的索引。然后,我们将数据写入缓冲区,并存储更新的写入序列。

Let’s try it out:

让我们来试一试。

@Test
public void givenCircularBuffer_whenAnElementIsEnqueued_thenSizeIsOne() {
    CircularBuffer buffer = new CircularBuffer<>(defaultCapacity);

    assertTrue(buffer.offer("Square"));
    assertEquals(1, buffer.size());
}

3.3. Poll

3.3. 投票

Finally, we’ll implement the poll operation that retrieves and removes the next unread element. The poll operation doesn’t remove the element but increments the read sequence.

最后,我们将实现poll操作,检索并删除下一个未读元素。poll操作并不删除元素,而是增加读取序列

Let’s implement it:

让我们来实施它。

public E poll() {
    boolean isEmpty = writeSequence < readSequence;
    if (!isEmpty) {
        E nextValue = data[readSequence % capacity];
        readSequence++;
        return nextValue;
    }
    return null;
}

Here, we’re reading the data at the current read sequence by computing the index in the array. Then, we’re incrementing the sequence and returning the value, if the buffer is not empty.

在这里,我们通过计算数组中的索引来读取当前读取序列中的数据。然后,如果缓冲区不是空的,我们将增加该序列并返回该值。

Let’s test it out:

让我们来测试一下。

@Test
public void givenCircularBuffer_whenAnElementIsDequeued_thenElementMatchesEnqueuedElement() {
    CircularBuffer buffer = new CircularBuffer<>(defaultCapacity);
    buffer.offer("Triangle");
    String shape = buffer.poll();

    assertEquals("Triangle", shape);
}

4. Producer-Consumer Problem

4.生产者-消费者问题

We’ve talked about the use of a ring buffer for exchanging data between two or more threads, which is an example of a synchronization problem called the Producer-Consumer problem. In Java, we can solve the producer-consumer problem in various ways using semaphores, bounded queues, ring buffers, etc.

我们已经谈到了使用环形缓冲器在两个或多个线程之间交换数据的问题,这是一个同步问题的例子,称为生产者-消费者问题。在Java中,我们可以使用semaphores有界队列、环形缓冲器等多种方式来解决生产者-消费者问题。

Let’s implement a solution based on a ring buffer.

让我们来实现一个基于环形缓冲器的解决方案。

4.1. volatile Sequence Fields

4.1.易失性 序列字段

Our implementation of the ring buffer is not thread-safe. Let’s make it thread-safe for the simple single-producer and single-consumer case.

我们对环形缓冲器的实现不是线程安全的。让我们在简单的单生产者和单消费者的情况下使其成为线程安全的。

The producer writes data to the buffer and increments the writeSequence, while the consumer only reads from the buffer and increments the readSequence. So, the backing array is contention-free and we can get away without any synchronization.

生产者将数据写入缓冲区并增加writeSequence,而消费者只从缓冲区读取数据并增加readSequence。所以,支持数组是无竞争的,我们可以不用任何同步。

But we still need to ensure that the consumer can see the latest value of the writeSequence field (visibility) and that the writeSequence is not updated before the data is actually available in the buffer (ordering).

但是我们仍然需要确保消费者能够看到writeSequence字段的最新值(visibility),并确保writeSequence在缓冲区中实际可用数据之前不会被更新(ordering)。

We can make the ring buffer concurrent and lock-free in this case by making the sequence fields volatile:

在这种情况下,我们可以通过使序列字段volatile来使环形缓冲器并发和无锁。

private volatile int writeSequence = -1, readSequence = 0;

In the offer method, a write to the volatile field writeSequence guarantees that the writes to the buffer happen before updating the sequence. At the same time, the volatile visibility guarantee ensures that the consumer will always see the latest value of writeSequence.

offer方法中,对volatile字段writeSequence的写入保证了在更新序列之前对缓冲区的写入发生。同时,volatile的可见性保证了消费者将始终看到writeSequence的最新值。

4.2. Producer

4.2.生产者

Let’s implement a simple producer Runnable that writes to the ring buffer:

让我们实现一个简单的生产者Runnable,向环形缓冲区写入。

public void run() {
    for (int i = 0; i < items.length;) {
        if (buffer.offer(items[i])) {
           System.out.println("Produced: " + items[i]);
            i++;
        }
    }
}

The producer thread would wait for an empty slot in a loop (busy-waiting).

生产者线程将在一个循环中等待一个空槽(忙于等待)。

4.3. Consumer

4.3.消费者

We’ll implement a consumer Callable that reads from the buffer:

我们将实现一个消费者Callable,从缓冲区读取数据。

public T[] call() {
    T[] items = (T[]) new Object[expectedCount];
    for (int i = 0; i < items.length;) {
        T item = buffer.poll();
        if (item != null) {
            items[i++] = item;
            System.out.println("Consumed: " + item);
        }
    }
    return items;
}

The consumer thread continues without printing if it receives a null value from the buffer.

如果消费者线程从缓冲区中收到一个null值,它就会继续下去而不打印。

Let’s write our driver code:

让我们来编写我们的驱动代码。

executorService.submit(new Thread(new Producer<String>(buffer)));
executorService.submit(new Thread(new Consumer<String>(buffer)));

Executing our producer-consumer program produces output like below:

执行我们的生产者-消费者程序会产生如下输出。

Produced: Circle
Produced: Triangle
  Consumed: Circle
Produced: Rectangle
  Consumed: Triangle
  Consumed: Rectangle
Produced: Square
Produced: Rhombus
  Consumed: Square
Produced: Trapezoid
  Consumed: Rhombus
  Consumed: Trapezoid
Produced: Pentagon
Produced: Pentagram
Produced: Hexagon
  Consumed: Pentagon
  Consumed: Pentagram
Produced: Hexagram
  Consumed: Hexagon
  Consumed: Hexagram

5. Conclusion

5.结论

In this tutorial, we’ve learned how to implement a Ring Buffer and explored how it can be used to solve the producer-consumer problem.

在本教程中,我们已经学会了如何实现环形缓冲区,并探讨了如何用它来解决生产者-消费者问题。

As usual, the source code for all the examples is available over on GitHub.

像往常一样,所有例子的源代码都可以在GitHub上找到