Guide to the ConcurrentSkipListMap – ConcurrentSkipListMap指南

最后修改: 2017年 4月 28日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this quick article, we’ll be looking at the ConcurrentSkipListMap class from the java.util.concurrent package.

在这篇快速文章中,我们将看看java.util.concurrentSkipListMap包中的类。

This construct allows us to create thread-safe logic in a lock-free way. It’s ideal for problems when we want to make an immutable snapshot of the data while other threads are still inserting data into the map.

这种结构允许我们以无锁的方式创建线程安全的逻辑。当我们想在其他线程还在向地图中插入数据时,对数据做一个不可变的快照,这是理想的问题。

We will be solving a problem of sorting a stream of events and getting a snapshot of the events that arrived in the last 60 seconds using that construct.

我们将解决一个问题:对事件流进行排序,并使用该结构获得过去60秒内到达的事件的快照

2. Stream Sorting Logic

2.流排序逻辑

Let’s say that we have a stream of events that are continually coming from multiple threads. We need to be able to take events from the last 60 seconds, and also events that are older than 60 seconds.

比方说,我们有一个持续来自多个线程的事件流。我们需要能够获取过去60秒内的事件,以及比60秒更早的事件。

First, let’s define the structure of our event data:

首先,让我们定义我们的事件数据的结构。

public class Event {
    private ZonedDateTime eventTime;
    private String content;

    // standard constructors/getters
}

We want to keep our events sorted using the eventTime field. To achieve this using the ConcurrentSkipListMap, we need to pass a Comparator to its constructor while creating an instance of it:

我们希望使用eventTime字段来保持我们的事件排序。为了使用ConcurrentSkipListMap实现这一点,我们需要在创建它的实例时向其构造函数传递一个Comparator

ConcurrentSkipListMap<ZonedDateTime, String> events
 = new ConcurrentSkipListMap<>(
 Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));

We’ll be comparing all arrived events using their timestamps. We are using the comparingLong() method and passing the extract function that can take a long timestamp from the ZonedDateTime.

我们将使用它们的时间戳来比较所有到达的事件。我们正在使用comparingLong() 方法,并通过提取函数,可以从ZonedDateTime中获取long时间戳。

When our events are arriving, we need only to add them to the map using the put() method. Note that this method does not require any explicit synchronization:

当我们的事件到达时,我们只需要使用put() 方法将它们添加到地图中。请注意,这个方法不需要任何显式同步。

public void acceptEvent(Event event) {
    events.put(event.getEventTime(), event.getContent());
}

The ConcurrentSkipListMap will handle the sorting of those events underneath using the Comparator that was passed to it in the constructor.

ConcurrentSkipListMap将使用构造函数中传递给它的Comparator来处理下面这些事件的排序。

The most notable pros of the ConcurrentSkipListMap are the methods that can make an immutable snapshot of its data in a lock-free way. To get all events that arrived within the past minute, we can use the tailMap() method and pass the time from which we want to get elements:

ConcurrentSkipListMap最显著的优点是可以以无锁方式对其数据进行不可更改的快照。为了获得过去一分钟内到达的所有事件,我们可以使用tailMap()方法并传递我们想要获得的元素的时间。

public ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
    return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}

It will return all events from the past minute. It will be an immutable snapshot and what is the most important is that other writing threads can add new events to the ConcurrentSkipListMap without any need to do explicit locking.

它将返回过去一分钟内的所有事件。它将是一个不可改变的快照,最重要的是,其他写作线程可以向ConcurrentSkipListMap添加新的事件,而不需要做显式锁定。

We can now get all events that arrived later that one minute from now – by using the headMap() method:

我们现在可以通过使用headMap()方法,获得所有从现在开始一分钟后到达的事件。

public ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
    return events.headMap(ZonedDateTime.now().minusMinutes(1));
}

This will return an immutable snapshot of all events that are older than one minute. All of the above methods belong to the EventWindowSort class, which we’ll use in the next section.

这将返回所有超过一分钟的事件的不可改变的快照。以上所有的方法都属于EventWindowSort类,我们将在下一节中使用它。

3. Testing the Sorting Stream Logic

3.测试排序流逻辑

Once we implemented our sorting logic using the ConcurrentSkipListMap, we can now test it by creating two writer threads that will send one hundred events each:

一旦我们使用ConcurrentSkipListMap实现了我们的排序逻辑,我们现在可以通过创建两个写作者线程来测试它,每个线程将发送100个事件。

ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;

Runnable producer = () -> IntStream
  .rangeClosed(0, 100)
  .forEach(index -> eventWindowSort.acceptEvent(
      new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
  );

for (int i = 0; i < numberOfThreads; i++) {
    executorService.execute(producer);
}

Each thread is invoking the acceptEvent() method, sending the events that have eventTime from now to “now minus one hundred seconds”.

每个线程都在调用acceptEvent() 方法,发送eventTime从现在到 “现在减去一百秒 “的事件。

In the meantime, we can invoke the getEventsFromLastMinute() method that will return the snapshot of events that are within the one minute window:

同时,我们可以调用getEventsFromLastMinute()方法,该方法将返回一分钟窗口内的事件快照。

ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute 
  = eventWindowSort.getEventsFromLastMinute();

The number of events in the eventsFromLastMinute will be varying in each test run depending on the speed at which the producer threads will be sending the events to the EventWindowSort. We can assert that there is not a single event in the returned snapshot that is older than one minute:

eventsFromLastMinute中的事件数量在每次测试运行中都会有所不同,这取决于生产者线程将事件发送到EventWindowSort的速度。我们可以断言,在返回的快照中没有任何一个事件的时间超过一分钟。

long eventsOlderThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertEquals(eventsOlderThanOneMinute, 0);

And that there are more than zero events in the snapshot that are within the one minute window:

而且,在快照中,有超过0个事件是在一分钟的窗口内。

long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertTrue(eventYoungerThanOneMinute > 0);

Our getEventsFromLastMinute() uses the tailMap() underneath.

我们的getEventsFromLastMinute()使用下面的tailMap()

Let’s test now the getEventsOlderThatOneMinute() that is using the headMap() method from the ConcurrentSkipListMap:

现在让我们测试一下getEventsOlderThatOneMinute(),它使用了ConcurrentSkipListMap中的headMap()方法:

ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute 
  = eventWindowSort.getEventsOlderThatOneMinute();

This time we get a snapshot of events that are older than one minute. We can assert that there are more than zero of such events:

这一次,我们得到了超过一分钟的事件的快照。我们可以断言,此类事件的数量超过了零。

long eventsOlderThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertTrue(eventsOlderThanOneMinute > 0);

And next, that there is not a single event that is from within the last minute:

而接下来,没有一个事件是来自最后一分钟内的。

long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertEquals(eventYoungerThanOneMinute, 0);

The most important thing to note is that we can take the snapshot of data while other threads are still adding new values to the ConcurrentSkipListMap.

最需要注意的是,我们可以在其他线程仍在向ConcurrentSkipListMap添加新值的时候进行数据快照

4. Conclusion

4.结论

In this quick tutorial, we had a look at the basics of the ConcurrentSkipListMap, along with some practical examples.

在这个快速教程中,我们看了一下ConcurrentSkipListMap的基础知识,以及一些实际的例子

We leveraged the high performance of the ConcurrentSkipListMap to implement a non-blocking algorithm that can serve us an immutable snapshot of data even if at the same time multiple threads are updating the map.

我们利用ConcurrentSkipListMap的高性能来实现一个非阻塞算法,即使同时有多个线程在更新地图,也能为我们提供一个不可改变的数据快照。

The implementation of all these examples and code snippets can be found in the GitHub project; this is a Maven project, so it should be easy to import and run as it is.

所有这些例子和代码片段的实现都可以在GitHub项目中找到;这是一个Maven项目,所以应该很容易导入并按原样运行。