Intro To Reactor Core – 反应堆核心介绍

最后修改: 2017年 3月 3日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

Reactor Core is a Java 8 library that implements the reactive programming model. It’s built on top of the Reactive Streams specification, a standard for building reactive applications.

Reactor Core是一个Java 8库,它实现了反应式编程模型。它建立在反应式流规范之上,该规范是构建反应式应用程序的标准。

From the background of non-reactive Java development, going reactive can be quite a steep learning curve. This becomes more challenging when comparing it to the Java 8 Stream API, as they could be mistaken for being the same high-level abstractions.

从非反应式Java开发的背景来看,走向反应式可能是一个相当陡峭的学习曲线。当与Java 8的Stream API相比较时,这将变得更具挑战性,因为它们可能会被误认为是相同的高级抽象。

In this article, we’ll attempt to demystify this paradigm. We’ll take small steps through Reactor until we’ve built a picture of how to compose reactive code, laying the foundation for more advanced articles to come in a later series.

在这篇文章中,我们将尝试揭开这一范式的神秘面纱。我们将通过Reactor的小步骤,直到我们建立起如何编写反应式代码的图景,为以后的系列文章中更高级的文章奠定基础。

2. Reactive Streams Specification

2.反应式流规范

Before we look at Reactor, we should look at the Reactive Streams Specification. This is what Reactor implements, and it lays the groundwork for the library.

在看Reactor之前,我们应该先看一下Reactive Streams Specification。这就是Reactor实现的内容,它为这个库奠定了基础。

Essentially, Reactive Streams is a specification for asynchronous stream processing.

本质上,Reactive Streams是一个异步流处理的规范。

In other words, a system where lots of events are being produced and consumed asynchronously. Think about a stream of thousands of stock updates per second coming into a financial application, and for it to have to respond to those updates in a timely manner.

换句话说,一个系统中大量的事件是异步产生和消费的。想想看,每秒有成千上万的股票更新流进入一个金融应用程序,而它必须及时地对这些更新作出反应。

One of the main goals of this is to address the problem of backpressure. If we have a producer which is emitting events to a consumer faster than it can process them, then eventually the consumer will be overwhelmed with events, running out of system resources.

这方面的主要目标之一是解决背压的问题。如果我们有一个生产者向消费者发射事件的速度超过了它的处理速度,那么最终消费者将被事件所淹没,耗尽系统资源。

Backpressure means that our consumer should be able to tell the producer how much data to send in order to prevent this, and this is what is laid out in the specification.

背压意味着我们的消费者应该能够告诉生产者要发送多少数据以防止这种情况,这就是规范中所规定的。

3. Maven Dependencies

3.Maven的依赖性

Before we get started, let’s add our Maven dependencies:

在开始之前,我们先添加Maven的依赖项。

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.16</version>
</dependency>

<dependency> 
    <groupId>ch.qos.logback</groupId> 
    <artifactId>logback-classic</artifactId> 
    <version>1.2.6</version> 
</dependency>

We’re also adding Logback as a dependency. This is because we’ll be logging the output of the Reactor in order to better understand the flow of data.

我们还将Logback作为一个依赖项加入。这是因为我们将对反应器的输出进行记录,以便更好地了解数据的流动。

4. Producing a Stream of Data

4.产生一个数据流

In order for an application to be reactive, the first thing it must be able to do is to produce a stream of data.

为了使一个应用程序具有反应性,它必须能够做的第一件事就是产生一个数据流。

This could be something like the stock update example that we gave earlier. Without this data, we wouldn’t have anything to react to, which is why this is a logical first step.

这可能是像我们之前给出的股票更新的例子。如果没有这些数据,我们就不会有任何反应,这就是为什么这是一个合理的第一步。

Reactive Core gives us two data types that enable us to do this.

Reactive Core给了我们两种数据类型,使我们能够做到这一点。

4.1. Flux

4.1. 通量

The first way of doing this is with Flux. It’s a stream that can emit 0..n elements. Let’s try creating a simple one:

第一个方法是使用Flux它是一个可以发射0…n元素的流。让我们试着创建一个简单的。

Flux<Integer> just = Flux.just(1, 2, 3, 4);

In this case, we have a static stream of four elements.

在这种情况下,我们有一个四个元素的静态流。

4.2. Mono

4.2.单声道

The second way of doing this is with a Mono, which is a stream of 0..1 elements. Let’s try instantiating one:

第二种方式是使用Mono它是一个0…1元素的流。让我们试着实例化一个。

Mono<Integer> just = Mono.just(1);

This looks and behaves almost exactly the same as the Flux, only this time we are limited to no more than one element.

这看起来和行为几乎与Flux完全一样,只是这一次我们被限制在不超过一个元素。

4.3. Why Not Only Flux?

4.3.为什么不仅仅是通量?

Before experimenting further, it’s worth highlighting why we have these two data types.

在进一步实验之前,值得强调的是为什么我们有这两种数据类型。

First, it should be noted that both Flux and Mono are implementations of the Reactive Streams Publisher interface. Both classes are compliant with the specification, and we could use this interface in their place:

首先,需要注意的是,FluxMono都是Reactive Streams Publisher界面的实现。这两个类都是符合规范的,我们可以用这个接口来代替它们。

Publisher<String> just = Mono.just("foo");

But really, knowing this cardinality is useful. This is because a few operations only make sense for one of the two types and because it can be more expressive (imagine findOne() in a repository).

但实际上,知道这个cardinality是有用的。这是因为有一些操作只对两种类型中的一种有意义,而且它可以有更多的表现力(想象一下findOne() 在存储库中)。

5. Subscribing to a Stream

5.订阅流媒体

Now we have a high-level overview of how to produce a stream of data, we need to subscribe to it in order for it to emit the elements.

现在我们对如何产生一个数据流有了一个高层次的概述,我们需要订阅它,以便让它发射出元素。

5.1. Collecting Elements

5.1.收集元素

Let’s use the subscribe() method to collect all the elements in a stream:

让我们使用 subscribe() 方法来收集一个流中的所有元素。

List<Integer> elements = new ArrayList<>();

Flux.just(1, 2, 3, 4)
  .log()
  .subscribe(elements::add);

assertThat(elements).containsExactly(1, 2, 3, 4);

The data won’t start flowing until we subscribe. Notice that we have added some logging as well, this will be helpful when we look at what’s happening behind the scenes.

在我们订阅之前,数据是不会开始流动的。请注意,我们还添加了一些日志记录,这在我们看幕后发生的事情时将会很有帮助。

5.2. The Flow of Elements

5.2.元素的流动

With logging in place, we can use it to visualize how the data is flowing through our stream:

有了日志记录,我们可以用它来可视化数据是如何在我们的数据流中流动的。

20:25:19.550 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | request(unbounded)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(3)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onNext(4)
20:25:19.553 [main] INFO  reactor.Flux.Array.1 - | onComplete()

First of all, everything is running on the main thread. Let’s not go into any details about this, as we’ll be taking a further look at concurrency later on in this article. It does make things simple, though, as we can deal with everything in order.

首先,一切都在主线程上运行。我们先不谈这个细节,因为我们将在本文后面进一步研究并发性问题。不过,这确实使事情变得简单,因为我们可以按顺序处理一切。

Now let’s go through the sequence that we have logged one by one:

现在让我们逐一查看我们所记录的序列。

  1. onSubscribe() – This is called when we subscribe to our stream
  2. request(unbounded) – When we call subscribe, behind the scenes we are creating a Subscription. This subscription requests elements from the stream. In this case, it defaults to unbounded, meaning it requests every single element available
  3. onNext() – This is called on every single element
  4. onComplete() – This is called last, after receiving the last element. There’s actually an onError() as well, which would be called if there is an exception, but in this case, there isn’t

This is the flow laid out in the Subscriber interface as part of the Reactive Streams Specification, and in reality, that’s what’s been instantiated behind the scenes in our call to onSubscribe(). It’s a useful method, but to better understand what’s happening let’s provide a Subscriber interface directly:

这是Subscriber接口中规定的流程,是反应式流规范的一部分,实际上,这就是我们调用onSubscribe()时在幕后实例化的内容。这是一个有用的方法,但是为了更好地理解正在发生的事情,让我们直接提供一个Subscriber接口。

Flux.just(1, 2, 3, 4)
  .log()
  .subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
      s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer integer) {
      elements.add(integer);
    }

    @Override
    public void onError(Throwable t) {}

    @Override
    public void onComplete() {}
});

We can see that each possible stage in the above flow maps to a method in the Subscriber implementation. It just happens that Flux has provided us with a helper method to reduce this verbosity.

我们可以看到,上述流程中的每个可能的阶段都映射到Subscriber 实现中的一个方法。碰巧的是,Flux为我们提供了一个辅助方法,以减少这种繁琐的操作。

5.3. Comparison to Java 8 Streams

5.3.与Java 8 Streams的比较

It still might appear that we have something synonymous to a Java 8 Stream doing collect:

它仍然可能出现,我们有一些与Java 8 Stream做收集的同义词。

List<Integer> collected = Stream.of(1, 2, 3, 4)
  .collect(toList());

Only we don’t.

只是我们不知道。

The core difference is that Reactive is a push model, whereas the Java 8 Streams are a pull model. In a reactive approach, events are pushed to the subscribers as they come in.

其核心区别在于,Reactive是一种推送模型,而Java 8的Streams是一种拉动模型。在反应式方法中,事件在进入时被推送给订阅者

The next thing to notice is a Streams terminal operator is just that, terminal, pulling all the data and returning a result. With Reactive we could have an infinite stream coming in from an external resource, with multiple subscribers attached and removed on an ad hoc basis. We can also do things like combine streams, throttle streams, and apply backpressure, which we will cover next.

接下来需要注意的是,Streams 终端操作者就是这样,终端,拉出所有的数据并返回一个结果。有了Reactive,我们可以有一个来自外部资源的无限的流,有多个订阅者在临时的基础上附加和移除。我们还可以做一些事情,比如合并流、节流和应用背压,我们接下来会介绍这些。

6. Backpressure

6.背压

The next thing we should consider is backpressure. In our example, the subscriber is telling the producer to push every single element at once. This could end up becoming overwhelming for the subscriber, consuming all of its resources.

我们应该考虑的下一件事是背压。在我们的例子中,订阅者告诉生产者要一次推送所有的元素。这最终可能会使订阅者不堪重负,消耗其所有的资源。

Backpressure is when a downstream can tell an upstream to send it less data in order to prevent it from being overwhelmed.

背压是指下游可以告诉上游发送更少的数据,以防止它被淹没

We can modify our Subscriber implementation to apply backpressure. Let’s tell the upstream to only send two elements at a time by using request():

我们可以修改我们的Subscriber实现来应用背压。让我们通过使用request()告诉上游一次只发送两个元素。

Flux.just(1, 2, 3, 4)
  .log()
  .subscribe(new Subscriber<Integer>() {
    private Subscription s;
    int onNextAmount;

    @Override
    public void onSubscribe(Subscription s) {
        this.s = s;
        s.request(2);
    }

    @Override
    public void onNext(Integer integer) {
        elements.add(integer);
        onNextAmount++;
        if (onNextAmount % 2 == 0) {
            s.request(2);
        }
    }

    @Override
    public void onError(Throwable t) {}

    @Override
    public void onComplete() {}
});

Now if we run our code again, we’ll see the request(2) is called, followed by two onNext() calls, then request(2) again.

现在,如果我们再次运行我们的代码,我们会看到request(2) 被调用,接着是两次onNext() 调用,然后request(2) 再次调用。

23:31:15.395 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
23:31:15.397 [main] INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.397 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onNext(3)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onNext(4)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | request(2)
23:31:15.398 [main] INFO  reactor.Flux.Array.1 - | onComplete()

Essentially, this is reactive pull backpressure. We are requesting the upstream to only push a certain amount of elements, and only when we are ready.

本质上,这是反应式拉动的背压。我们要求上游只推送一定数量的元素,而且只在我们准备好的时候。

If we imagine we were being streamed tweets from Twitter, it would then be up to the upstream to decide what to do. If tweets were coming in but there are no requests from the downstream, then the upstream could drop items, store them in a buffer, or some other strategy.

如果我们想象一下,我们被从Twitter上流传的推文,那么将由上游决定做什么。如果推文进来了,但没有来自下游的请求,那么上游可以放弃项目,把它们储存在缓冲区,或其他一些策略。

7. Operating on a Stream

7.在溪流上作业

We can also perform operations on the data in our stream, responding to events as we see fit.

我们还可以对流中的数据进行操作,对我们认为合适的事件做出反应。

7.1. Mapping Data in a Stream

7.1.在一个流中映射数据

A simple operation that we can perform is applying a transformation. In this case, let’s just double all the numbers in our stream:

我们可以进行的一个简单操作是应用一个转换。在这种情况下,让我们把我们的数据流中的所有数字加倍。

Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i * 2)
  .subscribe(elements::add);

map() will be applied when onNext() is called.

map() 将在onNext() 被调用时被应用。

7.2. Combining Two Streams

7.2.合并两个数据流

We can then make things more interesting by combining another stream with this one. Let’s try this by using zip() function:

然后我们可以通过将另一个流与这个流结合起来,使事情变得更加有趣。让我们通过使用zip()函数来尝试一下:

Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i * 2)
  .zipWith(Flux.range(0, Integer.MAX_VALUE), 
    (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
  .subscribe(elements::add);

assertThat(elements).containsExactly(
  "First Flux: 2, Second Flux: 0",
  "First Flux: 4, Second Flux: 1",
  "First Flux: 6, Second Flux: 2",
  "First Flux: 8, Second Flux: 3");

Here, we are creating another Flux that keeps incrementing by one and streaming it together with our original one. We can see how these work together by inspecting the logs:

在这里,我们正在创建另一个Flux,它不断地以1为单位递增,并将其与我们原来的Flux一起流转。我们可以通过检查日志看到这些东西是如何一起工作的。

20:04:38.064 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:04:38.065 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
20:04:38.066 [main] INFO  reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
20:04:38.066 [main] INFO  reactor.Flux.Range.2 - | onNext(0)
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
20:04:38.067 [main] INFO  reactor.Flux.Range.2 - | onNext(1)
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | onNext(3)
20:04:38.067 [main] INFO  reactor.Flux.Range.2 - | onNext(2)
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | onNext(4)
20:04:38.067 [main] INFO  reactor.Flux.Range.2 - | onNext(3)
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | onComplete()
20:04:38.067 [main] INFO  reactor.Flux.Array.1 - | cancel()
20:04:38.067 [main] INFO  reactor.Flux.Range.2 - | cancel()

Note how we now have one subscription per Flux. The onNext() calls are also alternated, so the index of each element in the stream will match when we apply the zip() function.

注意我们现在每个Flux有一个订阅。onNext() 调用也是交替进行的,所以当我们应用zip() 函数时,流中每个元素的索引都会匹配。

8. Hot Streams

8.热流

Currently, we’ve focused primarily on cold streams. These are static, fixed-length streams that are easy to deal with. A more realistic use case for reactive might be something that happens infinitely.

目前,我们主要关注的是冷流。这些是静态的、固定长度的流,很容易处理。反应式的一个更现实的用例可能是无限地发生的事情。

For example, we could have a stream of mouse movements that constantly needs to be reacted to or a Twitter feed. These types of streams are called hot streams, as they are always running and can be subscribed to at any point in time, missing the start of the data.

例如,我们可以有一个不断需要被反应的鼠标运动流,或者一个Twitter feed。这些类型的流被称为热流,因为它们一直在运行,可以在任何时间点订阅,错过了数据的开始。

8.1. Creating a ConnectableFlux

8.1.创建一个ConnectableFlux

One way to create a hot stream is by converting a cold stream into one. Let’s create a Flux that lasts forever, outputting the results to the console, which would simulate an infinite stream of data coming from an external resource:

创建热流的一种方法是将冷流转换为热流。让我们创建一个永远持续的Flux,将结果输出到控制台,这将模拟一个来自外部资源的无限的数据流。

ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
    while(true) {
        fluxSink.next(System.currentTimeMillis());
    }
})
  .publish();

By calling publish() we are given a ConnectableFlux. This means that calling subscribe() won’t cause it to start emitting, allowing us to add multiple subscriptions:

通过调用publish()我们得到了一个ConnectableFlux这意味着调用subscribe()不会导致它开始发射,允许我们添加多个订阅。

publish.subscribe(System.out::println);        
publish.subscribe(System.out::println);

If we try running this code, nothing will happen. It’s not until we call connect(), that the Flux will start emitting:

如果我们尝试运行这段代码,什么也不会发生。直到我们调用connect(),Flux才会开始发出声音。

publish.connect();

8.2. Throttling

8.2.节流

If we run our code, our console will be overwhelmed with logging. This is simulating a situation where too much data is being passed to our consumers. Let’s try getting around this with throttling:

如果我们运行我们的代码,我们的控制台将被日志淹没。这是在模拟一种情况,即有太多的数据被传递给我们的消费者。让我们试着用节流来解决这个问题。

ConnectableFlux<Object> publish = Flux.create(fluxSink -> {
    while(true) {
        fluxSink.next(System.currentTimeMillis());
    }
})
  .sample(ofSeconds(2))
  .publish();

Here, we’ve introduced a sample() method with an interval of two seconds. Now values will only be pushed to our subscriber every two seconds, meaning the console will be a lot less hectic.

在这里,我们引入了一个sample()方法,间隔时间为两秒。现在,数值只会每隔两秒被推送给我们的订阅者,这意味着控制台的忙碌程度将大大降低。

Of course, there are multiple strategies to reduce the amount of data sent downstream, such as windowing and buffering, but they will be left out of scope for this article.

当然,还有多种策略可以减少向下游发送的数据量,如窗口化和缓冲,但它们将不在本文的讨论范围内。

9. Concurrency

9.并发性

All of our above examples have currently run on the main thread. However, we can control which thread our code runs on if we want. The Scheduler interface provides an abstraction around asynchronous code, for which many implementations are provided for us. Let’s try subscribing to a different thread to main:

我们上面所有的例子目前都是在主线程上运行。然而,如果我们愿意,我们可以控制我们的代码在哪个线程上运行。Scheduler接口提供了一个围绕异步代码的抽象,为此,为我们提供了许多实现。让我们试着订阅一个与main不同的线程。

Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i * 2)
  .subscribeOn(Schedulers.parallel())
  .subscribe(elements::add);

The Parallel scheduler will cause our subscription to be run on a different thread, which we can prove by looking at the logs. We see the first entry comes from the main thread and the Flux is running in another thread called parallel-1.

Parallelscheduler将导致我们的订阅在不同的线程上运行,我们可以通过查看日志来证明这一点。我们看到第一个条目来自main线程,而Flux则在另一个名为parallel-1的线程中运行。

20:03:27.505 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:03:27.529 [parallel-1] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:03:27.531 [parallel-1] INFO  reactor.Flux.Array.1 - | request(unbounded)
20:03:27.531 [parallel-1] INFO  reactor.Flux.Array.1 - | onNext(1)
20:03:27.531 [parallel-1] INFO  reactor.Flux.Array.1 - | onNext(2)
20:03:27.531 [parallel-1] INFO  reactor.Flux.Array.1 - | onNext(3)
20:03:27.531 [parallel-1] INFO  reactor.Flux.Array.1 - | onNext(4)
20:03:27.531 [parallel-1] INFO  reactor.Flux.Array.1 - | onComplete()

Concurrency get’s more interesting than this, and it will be worth us exploring it in another article.

并发变得比这更有趣,值得我们在另一篇文章中探索它。

10. Conclusion

10.结论

In this article, we’ve given a high-level, end-to-end overview of Reactive Core. We’ve explained how we can publish and subscribe to streams, apply backpressure, operate on streams, and also handle data asynchronously. This should hopefully lay a foundation for us to write reactive applications.

在这篇文章中,我们已经给出了一个高层次的、端到端的Reactive Core的概述。我们已经解释了我们如何发布和订阅流,应用背压,对流进行操作,以及异步处理数据。希望这能为我们编写反应式应用程序打下基础。

Later articles in this series will cover more advanced concurrency and other reactive concepts. There’s also another article covering Reactor with Spring.

本系列后面的文章将介绍更高级的并发性和其他反应式概念。还有另一篇文章涉及Reactor with Spring

The source code for our application is available over on GitHub; this is a Maven project which should be able to run as-is.

我们应用程序的源代码可在GitHub上找到;这是一个Maven项目,应该可以按原样运行。