Intro To Reactor Core – 反应堆核心介绍

最后修改: 2017年 3月 3日


1. Introduction


Reactor Core is a Java 8 library that implements the reactive programming model. It’s built on top of the Reactive Streams specification, a standard for building reactive applications.

Reactor Core是一个Java 8库,它实现了反应式编程模型。它建立在反应式流规范之上,该规范是构建反应式应用程序的标准。

From the background of non-reactive Java development, going reactive can be quite a steep learning curve. This becomes more challenging when comparing it to the Java 8 Stream API, as they could be mistaken for being the same high-level abstractions.

从非反应式Java开发的背景来看,走向反应式可能是一个相当陡峭的学习曲线。当与Java 8的Stream API相比较时,这将变得更具挑战性,因为它们可能会被误认为是相同的高级抽象。

In this article, we’ll attempt to demystify this paradigm. We’ll take small steps through Reactor until we’ve built a picture of how to compose reactive code, laying the foundation for more advanced articles to come in a later series.


2. Reactive Streams Specification


Before we look at Reactor, we should look at the Reactive Streams Specification. This is what Reactor implements, and it lays the groundwork for the library.

在看Reactor之前,我们应该先看一下Reactive Streams Specification。这就是Reactor实现的内容,它为这个库奠定了基础。

Essentially, Reactive Streams is a specification for asynchronous stream processing.

本质上,Reactive Streams是一个异步流处理的规范。

In other words, a system where lots of events are being produced and consumed asynchronously. Think about a stream of thousands of stock updates per second coming into a financial application, and for it to have to respond to those updates in a timely manner.


One of the main goals of this is to address the problem of backpressure. If we have a producer which is emitting events to a consumer faster than it can process them, then eventually the consumer will be overwhelmed with events, running out of system resources.


Backpressure means that our consumer should be able to tell the producer how much data to send in order to prevent this, and this is what is laid out in the specification.


3. Maven Dependencies


Before we get started, let’s add our Maven dependencies:




We’re also adding Logback as a dependency. This is because we’ll be logging the output of the Reactor in order to better understand the flow of data.


4. Producing a Stream of Data


In order for an application to be reactive, the first thing it must be able to do is to produce a stream of data.


This could be something like the stock update example that we gave earlier. Without this data, we wouldn’t have anything to react to, which is why this is a logical first step.


Reactive Core gives us two data types that enable us to do this.

Reactive Core给了我们两种数据类型,使我们能够做到这一点。

4.1. Flux

4.1. 通量

The first way of doing this is with Flux. It’s a stream that can emit 0..n elements. Let’s try creating a simple one:


Flux<Integer> just = Flux.just(1, 2, 3, 4);

In this case, we have a static stream of four elements.


4.2. Mono


The second way of doing this is with a Mono, which is a stream of 0..1 elements. Let’s try instantiating one:


Mono<Integer> just = Mono.just(1);

This looks and behaves almost exactly the same as the Flux, only this time we are limited to no more than one element.


4.3. Why Not Only Flux?


Before experimenting further, it’s worth highlighting why we have these two data types.


First, it should be noted that both Flux and Mono are implementations of the Reactive Streams Publisher interface. Both classes are compliant with the specification, and we could use this interface in their place:

首先,需要注意的是,FluxMono都是Reactive Streams Publisher界面的实现。这两个类都是符合规范的,我们可以用这个接口来代替它们。

Publisher<String> just = Mono.just("foo");

But really, knowing this cardinality is useful. This is because a few operations only make sense for one of the two types and because it can be more expressive (imagine findOne() in a repository).

但实际上,知道这个cardinality是有用的。这是因为有一些操作只对两种类型中的一种有意义,而且它可以有更多的表现力(想象一下findOne() 在存储库中)。

5. Subscribing to a Stream


Now we have a high-level overview of how to produce a stream of data, we need to subscribe to it in order for it to emit the elements.


5.1. Collecting Elements


Let’s use the subscribe() method to collect all the elements in a stream:

让我们使用 subscribe() 方法来收集一个流中的所有元素。

List<Integer> elements = new ArrayList<>();

Flux.just(1, 2, 3, 4)

assertThat(elements).containsExactly(1, 2, 3, 4);

The data won’t start flowing until we subscribe. Notice that we have added some logging as well, this will be helpful when we look at what’s happening behind the scenes.


5.2. The Flow of Elements


With logging in place, we can use it to visualize how the data is flowing through our stream:


20:25:19.550 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | request(unbounded)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(3)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(4)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onComplete()

First of all, everything is running on the main thread. Let’s not go into any details about this, as we’ll be taking a further look at concurrency later on in this article. It does make things simple, though, as we can deal with everything in order.


Now let’s go through the sequence that we have logged one by one:


  1. onSubscribe() – This is called when we subscribe to our stream
  2. request(unbounded) – When we call subscribe, behind the scenes we are creating a Subscription. This subscription requests elements from the stream. In this case, it defaults to unbounded, meaning it requests every single element available
  3. onNext() – This is called on every single element
  4. onComplete() – This is called last, after receiving the last element. There’s actually an onError() as well, which would be called if there is an exception, but in this case, there isn’t

This is the flow laid out in the Subscriber interface as part of the Reactive Streams Specification, and in reality, that’s what’s been instantiated behind the scenes in our call to onSubscribe(). It’s a useful method, but to better understand what’s happening let’s provide a Subscriber interface directly:


Flux.just(1, 2, 3, 4)
  .subscribe(new Subscriber<Integer>() {
    public void onSubscribe(Subscription s) {

    public void onNext(Integer integer) {

    public void onError(Throwable t) {}

    public void onComplete() {}

We can see that each possible stage in the above flow maps to a method in the Subscriber implementation. It just happens that Flux has provided us with a helper method to reduce this verbosity.

我们可以看到,上述流程中的每个可能的阶段都映射到Subscriber 实现中的一个方法。碰巧的是,Flux为我们提供了一个辅助方法,以减少这种繁琐的操作。

5.3. Comparison to Java 8 Streams

5.3.与Java 8 Streams的比较

It still might appear that we have something synonymous to a Java 8 Stream doing collect:

它仍然可能出现,我们有一些与Java 8 Stream做收集的同义词。

List<Integer> collected = Stream.of(1, 2, 3, 4)

Only we don’t.


The core difference is that Reactive is a push model, whereas the Java 8 Streams are a pull model. In a reactive approach, events are pushed to the subscribers as they come in.

其核心区别在于,Reactive是一种推送模型,而Java 8的Streams是一种拉动模型。在反应式方法中,事件在进入时被推送给订阅者

The next thing to notice is a Streams terminal operator is just that, terminal, pulling all the data and returning a result. With Reactive we could have an infinite stream coming in from an external resource, with multiple subscribers attached and removed on an ad hoc basis. We can also do things like combine streams, throttle streams, and apply backpressure, which we will cover next.

接下来需要注意的是,Streams 终端操作者就是这样,终端,拉出所有的数据并返回一个结果。有了Reactive,我们可以有一个来自外部资源的无限的流,有多个订阅者在临时的基础上附加和移除。我们还可以做一些事情,比如合并流、节流和应用背压,我们接下来会介绍这些。

6. Backpressure


The next thing we should consider is backpressure. In our example, the subscriber is telling the producer to push every single element at once. This could end up becoming overwhelming for the subscriber, consuming all of its resources.


Backpressure is when a downstream can tell an upstream to send it less data in order to prevent it from being overwhelmed.


We can modify our Subscriber implementation to apply backpressure. Let’s tell the upstream to only send two elements at a time by using request():


Flux.just(1, 2, 3, 4)
  .subscribe(new Subscriber<Integer>() {
    private Subscription s;
    int onNextAmount;

    public void onSubscribe(Subscription s) {
        this.s = s;

    public void onNext(Integer integer) {
        if (onNextAmount % 2 == 0) {

    public void onError(Throwable t) {}

    public void onComplete() {}

Now if we run our code again, we’ll see the request(2) is called, followed by two onNext() calls, then request(2) again.

现在,如果我们再次运行我们的代码,我们会看到request(2) 被调用,接着是两次onNext() 调用,然后request(2) 再次调用。

23:31:15.395 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:31:15.397 [main] INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.397 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onNext(3)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onNext(4)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onComplete()

Essentially, this is reactive pull backpressure. We are requesting the upstream to only push a certain amount of elements, and only when we are ready.


If we imagine we were being streamed tweets from Twitter, it would then be up to the upstream to decide what to do. If tweets were coming in but there are no requests from the downstream, then the upstream could drop items, store them in a buffer, or some other strategy.


7. Operating on a Stream


We can also perform operations on the data in our stream, responding to events as we see fit.


7.1. Mapping Data in a Stream


A simple operation that we can perform is applying a transformation. In this case, let’s just double all the numbers in our stream:


Flux.just(1, 2, 3, 4)
  .map(i -> i * 2)

map() will be applied when onNext() is called.

map() 将在onNext() 被调用时被应用。

7.2. Combining Two Streams


We can then make things more interesting by combining another stream with this one. Let’s try this by using zip() function:


Flux.just(1, 2, 3, 4)
  .map(i -> i * 2)
  .zipWith(Flux.range(0, Integer.MAX_VALUE), 
    (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))

  "First Flux: 2, Second Flux: 0",
  "First Flux: 4, Second Flux: 1",
  "First Flux: 6, Second Flux: 2",
  "First Flux: 8, Second Flux: 3");

Here, we are creating another Flux that keeps incrementing by one and streaming it together with our original one. We can see how these work together by inspecting the logs:


20:04:38.064 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:04:38.065 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
20:04:38.066 [main] INFO  reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:04:38.066 [main] INFO  reactor.Flux.Range.2 - | onNext(0)
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
20:04:38.067 [main] INFO  reactor.Flux.Range.2 - | onNext(1)
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | onNext(3)
20:04:38.067 [main] INFO  reactor.Flux.Range.2 - | onNext(2)
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | onNext(4)
20:04:38.067 [main] INFO  reactor.Flux.Range.2 - | onNext(3)
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | onComplete()
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | cancel()
20:04:38.067 [main] INFO  reactor.Flux.Range.2 - | cancel()

Note how we now have one subscription per Flux. The onNext() calls are also alternated, so the index of each element in the stream will match when we apply the zip() function.

注意我们现在每个Flux有一个订阅。onNext() 调用也是交替进行的,所以当我们应用zip() 函数时,流中每个元素的索引都会匹配。

8. Hot Streams


Currently, we’ve focused primarily on cold streams. These are static, fixed-length streams that are easy to deal with. A more realistic use case for reactive might be something that happens infinitely.


For example, we could have a stream of mouse movements that constantly needs to be reacted to or a Twitter feed. These types of streams are called hot streams, as they are always running and can be subscribed to at any point in time, missing the start of the data.

例如,我们可以有一个不断需要被反应的鼠标运动流,或者一个Twitter feed。这些类型的流被称为热流,因为它们一直在运行,可以在任何时间点订阅,错过了数据的开始。

8.1. Creating a ConnectableFlux


One way to create a hot stream is by converting a cold stream into one. Let’s create a Flux that lasts forever, outputting the results to the console, which would simulate an infinite stream of data coming from an external resource:


ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
    while(true) {;

By calling publish() we are given a ConnectableFlux. This means that calling subscribe() won’t cause it to start emitting, allowing us to add multiple subscriptions:



If we try running this code, nothing will happen. It’s not until we call connect(), that the Flux will start emitting:



8.2. Throttling


If we run our code, our console will be overwhelmed with logging. This is simulating a situation where too much data is being passed to our consumers. Let’s try getting around this with throttling:


ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
    while(true) {;

Here, we’ve introduced a sample() method with an interval of two seconds. Now values will only be pushed to our subscriber every two seconds, meaning the console will be a lot less hectic.


Of course, there are multiple strategies to reduce the amount of data sent downstream, such as windowing and buffering, but they will be left out of scope for this article.


9. Concurrency


All of our above examples have currently run on the main thread. However, we can control which thread our code runs on if we want. The Scheduler interface provides an abstraction around asynchronous code, for which many implementations are provided for us. Let’s try subscribing to a different thread to main:


Flux.just(1, 2, 3, 4)
  .map(i -> i * 2)

The Parallel scheduler will cause our subscription to be run on a different thread, which we can prove by looking at the logs. We see the first entry comes from the main thread and the Flux is running in another thread called parallel-1.


20:03:27.505 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:03:27.529 [parallel-1] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:03:27.531 [parallel-1] INFO  reactor.Flux.Array.1 - | request(unbounded)
20:03:27.531 [parallel-1] INFO  reactor.Flux.Array.1 - | onNext(1)
20:03:27.531 [parallel-1] INFO  reactor.Flux.Array.1 - | onNext(2)
20:03:27.531 [parallel-1] INFO  reactor.Flux.Array.1 - | onNext(3)
20:03:27.531 [parallel-1] INFO  reactor.Flux.Array.1 - | onNext(4)
20:03:27.531 [parallel-1] INFO  reactor.Flux.Array.1 - | onComplete()

Concurrency get’s more interesting than this, and it will be worth us exploring it in another article.


10. Conclusion


In this article, we’ve given a high-level, end-to-end overview of Reactive Core. We’ve explained how we can publish and subscribe to streams, apply backpressure, operate on streams, and also handle data asynchronously. This should hopefully lay a foundation for us to write reactive applications.

在这篇文章中,我们已经给出了一个高层次的、端到端的Reactive Core的概述。我们已经解释了我们如何发布和订阅流,应用背压,对流进行操作,以及异步处理数据。希望这能为我们编写反应式应用程序打下基础。

Later articles in this series will cover more advanced concurrency and other reactive concepts. There’s also another article covering Reactor with Spring.

本系列后面的文章将介绍更高级的并发性和其他反应式概念。还有另一篇文章涉及Reactor with Spring

The source code for our application is available over on GitHub; this is a Maven project which should be able to run as-is.
