Java Concurrency Utility with JCTools – 使用JCTools的Java并发工具

最后修改: 2018年 4月 27日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll introduce the JCTools (Java Concurrency Tools) library.

在本教程中,我们将介绍JCTools(Java Concurrency Tools)库。

Simply put, this provides a number of utility data structures suitable for working in a multi-threaded environment.

简单地说,这提供了一些适合在多线程环境下工作的实用数据结构。

2. Non-Blocking Algorithms

2.非阻塞性算法

Traditionally, multi-threaded code which works on a mutable shared state uses locks to ensure data consistency and publications (changes made by one thread that are visible to another).

传统上,在可变的共享状态上工作的多线程代码使用锁来确保数据的一致性和出版物(一个线程所做的改变对另一个线程可见)。

This approach has a number of drawbacks:

这种方法有一些缺点。

  • threads might become blocked in an attempt to acquire a lock, making no progress until another thread’s operation is finished – this effectively prevents parallelism
  • the heavier lock contention is, the more time the JVM spends dealing with scheduling threads, managing contention and queues of waiting threads and the less real work it is doing
  • deadlocks are possible if more than one lock is involved and they are acquired/released in wrong order
  • a priority inversion hazard is possible – a high-priority thread is locked in an attempt to get a lock held by a low-priority thread
  • most of the time coarse-grained locks are used, hurting parallelism a lot – fine-grained locking requires more careful design, increases locking overhead and is more error-prone

An alternative is to use a non-blocking algorithm, i.e. an algorithm where failure or suspension of any thread cannot cause failure or suspension of another thread.

另一种方法是使用非阻塞算法,即任何线程的失败或暂停不能导致另一个线程失败或暂停的算法

A non-blocking algorithm is lock-free if at least one of the involved threads is guaranteed to make progress over an arbitrary period of time, i.e. deadlocks cannot arise during the processing.

如果至少有一个参与的线程能保证在任意时间段内取得进展,即在处理过程中不会出现死锁,那么非阻塞算法就是无锁

Furthermore, these algorithms are wait-free if there’s also a guaranteed per-thread progress.

此外,如果每个线程的进度也有保证,那么这些算法就是无等待的。

Here’s a non-blocking Stack example from the excellent Java Concurrency in Practice book; it defines the basic state:

这里有一个非阻塞的Stack例子,来自优秀的Java Concurrency in Practice书;它定义了基本状态。

public class ConcurrentStack<E> {

    AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

    private static class Node <E> {
        public E item;
        public Node<E> next;

        // standard constructor
    }
}

And also a couple of API methods:

还有几个API方法。

public void push(E item){
    Node<E> newHead = new Node<E>(item);
    Node<E> oldHead;
    
    do {
        oldHead = top.get();
        newHead.next = oldHead;
    } while(!top.compareAndSet(oldHead, newHead));
}

public E pop() {
    Node<E> oldHead;
    Node<E> newHead;
    do {
        oldHead = top.get();
        if (oldHead == null) {
            return null;
        }
        newHead = oldHead.next;
    } while (!top.compareAndSet(oldHead, newHead));
    
    return oldHead.item;
}

We can see that the algorithm uses fine-grained compare-and-swap (CAS) instructions and is lock-free (even if multiple threads call top.compareAndSet() simultaneously, one of them is guaranteed to be successful) but not wait-free as there’s no guarantee that CAS eventually succeeds for any particular thread.

我们可以看到,该算法使用了细粒度的比较和交换(CAS)指令,并且是无锁的(即使多个线程同时调用top.compareAndSet(),也能保证其中一个成功),但不是无等待,因为不能保证任何特定线程的CAS最终成功。

3. Dependency

3.依赖性

First, let’s add the JCTools dependency to our pom.xml:

首先,让我们把JCTools的依赖关系添加到我们的pom.xml

<dependency>
    <groupId>org.jctools</groupId>
    <artifactId>jctools-core</artifactId>
    <version>2.1.2</version>
</dependency>

Please note that the latest available version is available on Maven Central.

请注意,最新的可用版本可以在Maven Central上找到。

4. JCTools Queues

4.JCTools队列

The library offers a number of queues to use in a multi-threaded environment, i.e. one or more threads write to a queue and one or more threads read from it in a thread-safe lock-free manner.

该库提供了一些在多线程环境中使用的队列,即一个或多个线程以线程安全无锁的方式向一个队列写入,一个或多个线程从队列中读取。

The common interface for all Queue implementations is org.jctools.queues.MessagePassingQueue.

所有Queue实现的通用接口是org.jctools.queues.MessagePassingQueue

4.1. Types of Queues

4.1.队列的类型

All queues can be categorized on their producer/consumer policies:

所有队列都可以按其生产者/消费者政策进行分类。

  • single producer, single consumer – such classes are named using the prefix Spsc, e.g. SpscArrayQueue
  • single producer, multiple consumers – use Spmc prefix, e.g. SpmcArrayQueue
  • multiple producers, single consumer – use Mpsc prefix, e.g. MpscArrayQueue
  • multiple producers, multiple consumers – use Mpmc prefix, e.g. MpmcArrayQueue

It’s important to note that there are no policy checks internally, i.e. a queue might silently misfunction in case of incorrect usage.

需要注意的是,内部没有政策检查,也就是说,在不正确使用的情况下,队列可能会默默地误操作

E.g. the test below populates a single-producer queue from two threads and passes even though the consumer is not guaranteed to see data from different producers:

例如,下面的测试从两个线程填充一个单一生产者队列,即使消费者不能保证看到来自不同生产者的数据也能通过。

SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2);

Thread producer1 = new Thread(() -> queue.offer(1));
producer1.start();
producer1.join();

Thread producer2 = new Thread(() -> queue.offer(2));
producer2.start();
producer2.join();

Set<Integer> fromQueue = new HashSet<>();
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
consumer.start();
consumer.join();

assertThat(fromQueue).containsOnly(1, 2);

4.2. Queue Implementations

4.2.尾部实施

Summarizing the classifications above, here is the list of JCTools queues:

总结上面的分类,这里是JCTools队列的列表。

  • SpscArrayQueue single producer, single consumer, uses an array internally, bound capacity
  • SpscLinkedQueue single producer, single consumer, uses linked list internally, unbound capacity
  • SpscChunkedArrayQueue single producer, single consumer, starts with initial capacity and grows up to max capacity
  • SpscGrowableArrayQueue single producer, single consumer, starts with initial capacity and grows up to max capacity. This is the same contract as SpscChunkedArrayQueue, the only difference is internal chunks management. It’s recommended to use SpscChunkedArrayQueue because it has a simplified implementation
  • SpscUnboundedArrayQueue single producer, single consumer, uses an array internally, unbound capacity
  • SpmcArrayQueue single producer, multiple consumers, uses an array internally, bound capacity
  • MpscArrayQueue multiple producers, single consumer, uses an array internally, bound capacity
  • MpscLinkedQueue multiple producers, single consumer, uses a linked list internally, unbound capacity
  • MpmcArrayQueue multiple producers, multiple consumers, uses an array internally, bound capacity

4.3. Atomic Queues

4.3.原子队列

All queues mentioned in the previous section use sun.misc.Unsafe. However, with the advent of Java 9 and the JEP-260 this API becomes inaccessible by default.

上一节提到的所有队列都使用sun.misc.Unsafe。然而,随着 Java 9 和 JEP-260 的出现,这个 API 在默认情况下变得无法访问。

So, there are alternative queues which use java.util.concurrent.atomic.AtomicLongFieldUpdater (public API, less performant) instead of sun.misc.Unsafe.

因此,有一些替代队列使用java.util.concurrent.atomic.AtomicLongFieldUpdater(公共API,性能较差)而不是sun.misc.Unsafe

They are generated from the queues above and their names have the word Atomic inserted in between, e.g. SpscChunkedAtomicArrayQueue or MpmcAtomicArrayQueue.

它们是由上面的队列生成的,它们的名字中间插入了Atomic一词,例如SpscChunkedAtomicArrayQueueMpmcAtomicArrayQueue

It’s recommended to use ‘regular’ queues if possible and resort to AtomicQueues only in environments where sun.misc.Unsafe is prohibited/ineffective like HotSpot Java9+ and JRockit.

建议尽可能使用 “常规 “队列,只有在sun.misc.Unsafe被禁止/无效的环境中,如HotSpot Java9+和JRockit,才使用AtomicQueues

4.4. Capacity

4.4.容量

All JCTools queues might also have a maximum capacity or be unbound. When a queue is full and it’s bound by capacity, it stops accepting new elements.

所有的JCTools队列也可能有一个最大的容量,或者不受约束。当一个队列满了,并且它被容量所约束,它就会停止接受新的元素。

In the following example, we:

在下面的例子中,我们。

  • fill the queue
  • ensure that it stops accepting new elements after that
  • drain from it and ensure that it’s possible to add more elements afterward

Please note that a couple of code statements are dropped for readability. The complete implementation can be found over on GitHub:

请注意,为了便于阅读,有几条代码语句被删除。完整的实现可以在GitHub>上找到。

SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);
CountDownLatch startConsuming = new CountDownLatch(1);
CountDownLatch awakeProducer = new CountDownLatch(1);

Thread producer = new Thread(() -> {
    IntStream.range(0, queue.capacity()).forEach(i -> {
        assertThat(queue.offer(i)).isTrue();
    });
    assertThat(queue.offer(queue.capacity())).isFalse();
    startConsuming.countDown();
    awakeProducer.await();
    assertThat(queue.offer(queue.capacity())).isTrue();
});

producer.start();
startConsuming.await();

Set<Integer> fromQueue = new HashSet<>();
queue.drain(fromQueue::add);
awakeProducer.countDown();
producer.join();
queue.drain(fromQueue::add);

assertThat(fromQueue).containsAll(
  IntStream.range(0, 17).boxed().collect(toSet()));

5. Other JCTools Data Structures

5.其他JCTools数据结构

JCTools offers a couple of non-Queue data structures as well.

JCTools也提供了一些非队列的数据结构。

All of them are listed below:

所有这些都列在下面。

  • NonBlockingHashMap a lock-free ConcurrentHashMap alternative with better-scaling properties and generally lower mutation costs. It’s implemented via sun.misc.Unsafe, so, it’s not recommended to use this class in a HotSpot Java9+ or JRockit environment
  • NonBlockingHashMapLong like NonBlockingHashMap but uses primitive long keys
  • NonBlockingHashSet a  simple wrapper around NonBlockingHashMap like JDK’s java.util.Collections.newSetFromMap()
  • NonBlockingIdentityHashMap like NonBlockingHashMap but compares keys by identity.
  • NonBlockingSetInt – a multi-threaded bit-vector set implemented as an array of primitive longs. Works ineffectively in case of silent autoboxing

6. Performance Testing

6.性能测试

Let’s use JMH for comparing the JDK’s ArrayBlockingQueue vs. JCTools queue’s performance. JMH is an open-source micro-benchmark framework from Sun/Oracle JVM gurus which protects us from indeterminism of compiler/jvm optimization algorithms). Please feel free to get more details on it in this article.

让我们使用JMH来比较JDK的ArrayBlockingQueue与JCTools队列的性能。JMH是一个来自Sun/Oracle JVM大师的开源微观基准框架,它可以保护我们免受编译器/jvm优化算法的不确定性影响)。请随时在这篇文章中获得更多关于它的细节。

Note that the code snippet below misses a couple of statements in order to improve readability. Please find the complete source code on GitHub:

请注意,为了提高可读性,下面的代码片段少了几条语句。请在GitHub上找到完整的源代码:

public class MpmcBenchmark {

    @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK})
    public volatile String implementation;

    public volatile Queue<Long> queue;

    @Benchmark
    @Group(GROUP_NAME)
    @GroupThreads(PRODUCER_THREADS_NUMBER)
    public void write(Control control) {
        // noinspection StatementWithEmptyBody
        while (!control.stopMeasurement && !queue.offer(1L)) {
            // intentionally left blank
        }
    }

    @Benchmark
    @Group(GROUP_NAME)
    @GroupThreads(CONSUMER_THREADS_NUMBER)
    public void read(Control control) {
        // noinspection StatementWithEmptyBody
        while (!control.stopMeasurement && queue.poll() == null) {
            // intentionally left blank
        }
    }
}

Results (excerpt for the 95th percentile, nanoseconds per-operation):

结果(摘录为第95百分位数,每个操作的纳秒数)。

MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op

We can see that MpmcArrayQueue performs just slightly better than MpmcAtomicArrayQueue and ArrayBlockingQueue is slower by a factor of two.

我们可以看到,MpmcArrayQueue的性能仅比MpmcAtomicArrayQueue稍好,而ArrayBlockingQueue则慢了2倍。

7. Drawbacks of Using JCTools

7.使用JCTools的弊端

Using JCTools has an important drawback – it’s not possible to enforce that the library classes are used correctly. For example, consider a situation when we start using MpscArrayQueue in our large and mature project (note that there must be a single consumer).

使用JCTools有一个重要的缺点–不可能强制执行库类的正确使用。例如,考虑一下我们在大型成熟项目中开始使用MpscArrayQueue的情况(注意,必须有一个消费者)。

Unfortunately, as the project is big, there is a possibility that someone makes a programming or configuration error and the queue is now read from more than one thread. The system seems to work as before but now there is a chance that consumers miss some messages. That is a real problem which might have a big impact and is very hard to debug.

不幸的是,由于项目很大,有可能有人犯了一个编程或配置错误,现在队列是从一个以上的线程读取的。系统似乎像以前一样工作,但现在有可能消费者会错过一些消息。这是一个真正的问题,可能会有很大的影响,而且非常难以调试。

Ideally, it should be possible to run a system with a particular system property which forces JCTools to ensure thread access policy. E.g. local/test/staging environments (but not production) might have it turned on. Sadly, JCTools does not provide such a property.

理想情况下,应该可以用特定的系统属性来运行一个系统,强制JCTools确保线程访问策略。例如,本地/测试/暂存环境(但不是生产环境)可以将其打开。遗憾的是,JCTools并没有提供这样的属性。

Another consideration is that even though we ensured that JCTools is significantly faster than the JDK’s counterpart, it doesn’t mean our application gains the same amount of speed as we start using the custom queue implementations. Most applications don’t exchange a lot of objects between threads and are mostly I/O bound.

另一个考虑是,即使我们确保了JCTools的速度明显快于JDK的对应产品,但这并不意味着我们的应用程序在开始使用自定义队列实现时获得了同样的速度。大多数应用程序不会在线程之间交换大量的对象,而且大多是I/O绑定。

8. Conclusion

8.结论

We now have a basic understanding of the utility classes offered by JCTools and saw how well they perform, compared to the JDK’s counterparts under heavy load.

我们现在对JCTools提供的实用类有了基本的了解,并看到了它们在重载下与JDK的同类产品相比,表现得如何。

In conclusion, it’s worth to use the library only if we exchange a lot of objects between threads and even then it’s necessary to be very careful to preserve thread access policy.

总之,只有当我们在线程之间交换大量对象时,才值得使用该库,即使如此,也必须非常小心地保护线程访问策略。

As always, the full source code for the samples above can be found over on GitHub.

一如既往,上述样本的完整源代码可以在GitHub上找到over