Concurrency with LMAX Disruptor – An Introduction – 使用LMAX Disruptor的并发性–简介

最后修改: 2017年 1月 20日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

This article introduces the LMAX Disruptor and talks about how it helps to achieve software concurrency with low latency. We will also see a basic usage of the Disruptor library.

本文介绍了LMAX Disruptor,并讲述了它如何帮助实现低延迟的软件并发性。我们还将看到Disruptor库的一个基本用法。

2. What Is a Disruptor?

2.什么是破坏者?

Disruptor is an open source Java library written by LMAX. It is a concurrent programming framework for the processing of a large number of transactions, with low-latency (and without the complexities of concurrent code). The performance optimization is achieved by a software design that exploits the efficiency of underlying hardware.

Disruptor是一个由LMAX编写的开源Java库。它是一个并发编程框架,用于处理大量的事务,具有低延迟性(并且没有并发代码的复杂性)。性能优化是通过利用底层硬件效率的软件设计来实现的。

2.1. Mechanical Sympathy

2.1.机械的同情心

Let’s start with the core concept of mechanical sympathy – that is all about understanding how the underlying hardware operates and programming in a way that best works with that hardware.

让我们从机械共鸣的核心概念开始–那就是了解底层硬件的运作方式,并以最适合该硬件的方式进行编程。

For example, let’s see how CPU and memory organization can impact software performance. The CPU has several layers of cache between it and main memory. When the CPU is performing an operation, it first looks in L1 for the data, then L2, then L3, and finally, the main memory. The further it has to go, the longer the operation will take.

例如,让我们看看CPU和内存组织如何影响软件性能。CPU和主存储器之间有几层缓存。当CPU执行一个操作时,它首先在L1寻找数据,然后是L2,然后是L3,最后是主存储器。它要去的地方越远,操作的时间就越长。

If the same operation is performed on a piece of data multiple times (for example, a loop counter), it makes sense to load that data into a place very close to the CPU.

如果对一个数据多次进行相同的操作(例如,一个循环计数器),将该数据加载到离CPU很近的地方是有意义的。

Some indicative figures for the cost of cache misses:

缓存错过的成本的一些指示性数字。

Latency from CPU to CPU cycles Time
Main memory Multiple ~60-80 ns
L3 cache ~40-45 cycles ~15 ns
L2 cache ~10 cycles ~3 ns
L1 cache ~3-4 cycles ~1 ns
Register 1 cycle Very very quick

2.2. Why Not Queues

2.2.为什么不排队

Queue implementations tend to have write contention on the head, tail, and size variables. Queues are typically always close to full or close to empty due to the differences in pace between consumers and producers. They very rarely operate in a balanced middle ground where the rate of production and consumption is evenly matched.

队列的实现往往在头部、尾部和大小变量上有写争论。由于消费者和生产者之间的速度差异,队列通常总是接近于满或接近于空。它们很少在平衡的中间地带运行,即生产和消费的速度是均匀匹配的。

To deal with the write contention, a queue often uses locks, which can cause a context switch to the kernel. When this happens the processor involved is likely to lose the data in its caches.

为了处理写入竞争,一个队列经常使用锁,这可能会导致内核的上下文切换。当这种情况发生时,相关的处理器很可能会丢失其缓存中的数据。

To get the best caching behavior, the design should have only one core writing to any memory location (multiple readers are fine, as processors often use special high-speed links between their caches). Queues fail the one-writer principle.

为了得到最好的缓存行为,设计中应该只有一个核心向任何内存位置写东西(多个读者也可以,因为处理器经常在其缓存之间使用特殊的高速链接)。队列不能满足一写的原则。

If two separate threads are writing to two different values, each core invalidates the cache line of the other (data is transferred between main memory and cache in blocks of fixed size, called cache lines). That is a write-contention between the two threads even though they’re writing to two different variables. This is called false sharing, because every time the head is accessed, the tail gets accessed too, and vice versa.

如果两个独立的线程在写两个不同的值,每个核心都会使另一个核心的缓存线失效(数据在主内存和缓存之间以固定大小的块传输,称为缓存线)。这就是两个线程之间的写操作,尽管他们在写两个不同的变量。这就是所谓的假共享,因为每当头部被访问时,尾部也会被访问,反之亦然。

2.3. How the Disruptor Works

2.3.破坏者如何工作

Ringbuffer overview and its API

Disruptor has an array based circular data structure (ring buffer). It is an array that has a pointer to next available slot. It is filled with pre-allocated transfer objects. Producers and consumers perform writing and reading of data to the ring without locking or contention.

Disruptor有一个基于数组的循环数据结构(环形缓冲器)。它是一个数组,有一个指向下一个可用插槽的指针。它被预先分配的传输对象填满。生产者和消费者在没有锁定或争夺的情况下向环形缓冲区进行数据的写入和读取。

In a Disruptor, all events are published to all consumers (multicast), for parallel consumption through separate downstream queues. Due to parallel processing by consumers, it is necessary to coordinate dependencies between the consumers (dependency graph).

在Disruptor中,所有的事件都被发布给所有的消费者(组播),通过单独的下游队列进行平行消费。由于消费者的平行处理,有必要协调消费者之间的依赖关系(依赖关系图)。

Producers and consumers have a sequence counter to indicate which slot in the buffer it is currently working on. Each producer/consumer can write its own sequence counter but can read other’s sequence counters. The producers and consumers read the counters to ensure the slot it wants to write in is available without any locks.

生产者和消费者都有一个序列计数器,以表明它当前正在处理缓冲区中的哪个槽。每个生产者/消费者都可以写自己的序列计数器,但可以读取其他的序列计数器。生产者和消费者读取计数器,以确保它想写的槽是可用的,没有任何锁。

3. Using the Disruptor Library

3.使用破坏者资料库

3.1. Maven Dependency

3.1.Maven的依赖性

Let’s start by adding Disruptor library dependency in pom.xml:

让我们先在pom.xml中添加Disruptor库的依赖性。

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.6</version>
</dependency>

The latest version of the dependency can be checked here.

最新版本的依赖关系可以在这里检查。

3.2. Defining an Event

3.2.定义一个事件

Let’s define the event that carries the data:

我们来定义携带数据的事件。

public static class ValueEvent {
    private int value;
    public final static EventFactory EVENT_FACTORY 
      = () -> new ValueEvent();

    // standard getters and setters
}

The EventFactory lets the Disruptor preallocate the events.

EventFactory让破坏者预先分配事件。

3.3. Consumer

3.3.消费者

Consumers read data from the ring buffer. Let’s define a consumer that will handle the events:

消费者从环形缓冲区读取数据。让我们来定义一个处理事件的消费者。

public class SingleEventPrintConsumer {
    ...

    public EventHandler<ValueEvent>[] getEventHandler() {
        EventHandler<ValueEvent> eventHandler 
          = (event, sequence, endOfBatch) 
            -> print(event.getValue(), sequence);
        return new EventHandler[] { eventHandler };
    }
 
    private void print(int id, long sequenceId) {
        logger.info("Id is " + id 
          + " sequence id that was used is " + sequenceId);
    }
}

In our example, the consumer is just printing to a log.

在我们的例子中,消费者只是打印到一个日志。

3.4. Constructing the Disruptor

3.4.构建干扰器

Construct the Disruptor:

构建 “破坏者”。

ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;

WaitStrategy waitStrategy = new BusySpinWaitStrategy();
Disruptor<ValueEvent> disruptor 
  = new Disruptor<>(
    ValueEvent.EVENT_FACTORY, 
    16, 
    threadFactory, 
    ProducerType.SINGLE, 
    waitStrategy);

In the constructor of Disruptor, the following are defined:

在Disruptor的构造函数中,定义了以下内容。

  • Event Factory – Responsible for generating objects which will be stored in ring buffer during initialization
  • The size of Ring Buffer – We have defined 16 as the size of the ring buffer. It has to be a power of 2 else it would throw an exception while initialization. This is important because it is easy to perform most of the operations using logical binary operators e.g. mod operation
  • Thread Factory – Factory to create threads for event processors
  • Producer Type – Specifies whether we will have single or multiple producers
  • Waiting strategy – Defines how we would like to handle slow subscriber who doesn’t keep up with producer’s pace

Connect the consumer handler:

连接消费者处理程序。

disruptor.handleEventsWith(getEventHandler());

It is possible to supply multiple consumers with Disruptor to handle the data that is produced by producer. In the example above, we have just one consumer a.k.a. event handler.

可以用Disruptor提供多个消费者来处理生产者产生的数据。在上面的例子中,我们只有一个消费者,也就是事件处理者。

3.5. Starting the Disruptor

3.5.启动干扰器

To start the Disruptor:

要启动 “破坏者”。

RingBuffer<ValueEvent> ringBuffer = disruptor.start();

3.6. Producing and Publishing Events

3.6.制作和发布活动

Producers place the data in the ring buffer in a sequence. Producers have to be aware of the next available slot so that they don’t overwrite data that is not yet consumed.

生产者按顺序将数据放入环形缓冲区。生产者必须知道下一个可用的插槽,这样他们就不会覆盖尚未消耗的数据。

Use the RingBuffer from Disruptor for publishing:

使用Disruptor的RingBuffer进行发布。

for (int eventCount = 0; eventCount < 32; eventCount++) {
    long sequenceId = ringBuffer.next();
    ValueEvent valueEvent = ringBuffer.get(sequenceId);
    valueEvent.setValue(eventCount);
    ringBuffer.publish(sequenceId);
}

Here, the producer is producing and publishing items in sequence. It is important to note here that Disruptor works similar to 2 phase commit protocol. It reads a new sequenceId and publishes. The next time it should get sequenceId + 1 as the next sequenceId.

在这里,生产者是按顺序生产和发布项目。这里需要注意的是,Disruptor的工作方式类似于2阶段提交协议。它读取一个新的sequenceId并发布。下一次它应该得到sequenceId+1作为下一个sequenceId.

4. Conclusion

4.结论

In this tutorial, we have seen what a Disruptor is and how it achieves concurrency with low latency. We have seen the concept of mechanical sympathy and how it may be exploited to achieve low latency. We have then seen an example using the Disruptor library.

在本教程中,我们看到了什么是中断器,以及它如何实现低延迟的并发性。我们已经看到了机械共鸣的概念,以及如何利用它来实现低延时。然后我们看到了一个使用Disruptor库的例子。

The example code can be found in the GitHub project – this is a Maven based project, so it should be easy to import and run as is.

示例代码可以在GitHub项目中找到 – 这是一个基于Maven的项目,所以应该很容易导入并按原样运行。