Programmatically Creating Sequences with Project Reactor – 用Project Reactor程序化地创建序列

最后修改: 2019年 3月 13日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll use Project Reactor basics to learn a few techniques for creating Fluxes.

在本教程中,我们将使用Project Reactor基础知识来学习创建Fluxes的一些技术。

2. Maven Dependencies

2.Maven的依赖性

Let’s get started with a couple of dependencies. We’ll need reactor-core and reactor-test:

让我们从几个依赖项开始。我们需要reactor-corereactor-test

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.2.6.RELEASE</version>
    <scope>test</scope>
</dependency>

3. Synchronous Emission

3.同步发射

The simplest way to create a Flux is Flux#generate. This method relies on a generator function to produce a sequence of items.

创建Flux的最简单方法是Flux#generate这种方法依赖于一个生成器函数来产生一个项目的序列。

But first, let’s define a class to hold our methods illustrating the generate method:

但首先,让我们定义一个类来容纳我们的方法,说明generate方法。

public class SequenceGenerator {
    // methods that will follow
}

3.1. Generator With New States

3.1.有新国家的发电机

Let’s see how we can generate the Fibonacci sequence with Reactor:

让我们看看如何用Reactor生成Fibonacci序列

public Flux<Integer> generateFibonacciWithTuples() {
    return Flux.generate(
            () -> Tuples.of(0, 1),
            (state, sink) -> {
                sink.next(state.getT1());
                return Tuples.of(state.getT2(), state.getT1() + state.getT2());
            }
    );
}

It’s not hard to see this generate method takes two functions as its arguments – a Callable and a BiFunction:

不难看出,这个generate方法需要两个函数作为参数 – 一个Callable和一个BiFunction

  • The Callable function sets up the initial state for the generator – in this case, it’s a Tuples with elements 0 and 1
  • The BiFuntion function is a generator, consuming a SynchronousSink, then emitting an item in each round with the sink’s next method and the current state

As its name suggests, a SynchronousSink object works synchronously. However, notice that we cannot call this object’s next method more than once per generator calling.

顾名思义,SynchronousSink对象可以同步工作。然而,请注意,我们不能在每个生成器调用中多次调用该对象的next方法。

Let’s verify the generated sequence with StepVerifier:

让我们用StepVerifier验证生成的序列。

@Test
public void whenGeneratingNumbersWithTuplesState_thenFibonacciSequenceIsProduced() {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    Flux<Integer> fibonacciFlux = sequenceGenerator.generateFibonacciWithTuples().take(5);

    StepVerifier.create(fibonacciFlux)
      .expectNext(0, 1, 1, 2, 3)
      .expectComplete()
      .verify();
}

In this example, the subscriber requests just five items, hence the generated sequence ends with number 3.

在这个例子中,用户只要求五个项目,因此生成的序列以数字3结束。

As we can see, the generator returns a new state object to be used in the next pass. It’s not necessary to do so, though. We can reuse a state instance for all the invocations of the generator instead.

正如我们所看到的,生成器返回一个新的状态对象,以便在下一次传递中使用。虽然没有必要这样做。我们可以为生成器的所有调用重复使用一个状态实例。

3.2. Generator With Mutable State

3.2.具有可改变状态的生成器

Suppose we want to generate the Fibonacci sequence with a recycled state. To demonstrate this use case, let’s first define a class:

假设我们想生成具有循环状态的斐波那契数列。为了演示这个用例,我们首先定义一个类。

public class FibonacciState {
    private int former;
    private int latter;

    // constructor, getters and setters
}

We’ll use an instance of this class to hold the generator’s state. The two properties of this instance, former and latter, store two consecutive numbers in the sequence.

我们将使用这个类的一个实例来保持发生器的状态。这个实例的两个属性,formerlatter,存储了序列中的两个连续数字。

If we modify our initial example, we’ll now be using mutable state with generate:

如果我们修改我们最初的例子,我们现在将用generate来使用可变的状态。

public Flux<Integer> generateFibonacciWithCustomClass(int limit) {
    return Flux.generate(
      () -> new FibonacciState(0, 1),
      (state, sink) -> {
        sink.next(state.getFormer());
        if (state.getLatter() > limit) {
            sink.complete();
        }
        int temp = state.getFormer();
        state.setFormer(state.getLatter());
        state.setLatter(temp + state.getLatter());
        return state;
    });
}

Similar to the previous example, this generate variant has state supplier and generator parameters.

与前面的例子类似,这个generate变量有状态供应商和生成器参数。

The state supplier of type Callable simply creates a FibonacciState object with the initial properties of 0 and 1. This state object will be reused throughout the lifecycle of the generator.

Callable类型的状态供应商简单地创建一个FibonacciState对象,其初始属性为01。这个状态对象将在生成器的整个生命周期中被重复使用。

Just like the SynchronousSink in the Fibonacci-with-Tuples example, the sink over here produces items one by one. However, unlike that example, the generator returns the same state object each time it’s called.

就像Fibonacci-with-Tuples例子中的SynchronousSink一样,这里的sink是一个一个地产生项目。然而,与那个例子不同的是,生成器每次被调用时都返回相同的状态对象。

Also notice this time, to avoid an infinite sequence, we instruct the sink to complete when the produced value reaches a limit.

同时注意到这一次,为了避免无限序列,我们指示水槽在产生的值达到一个极限时完成。

And, let’s again do a quick test to confirm that it works:

而且,让我们再次做一个快速测试,以确认它是否有效。

@Test
public void whenGeneratingNumbersWithCustomClass_thenFibonacciSequenceIsProduced() {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();

    StepVerifier.create(sequenceGenerator.generateFibonacciWithCustomClass(10))
      .expectNext(0, 1, 1, 2, 3, 5, 8)
      .expectComplete()
      .verify();
}

3.3. Stateless Variant

3.3.无状态变体

The generate method has another variant with only one parameter of type Consumer<SynchronousSink>. This variant is only suitable to produce a pre-determined sequence, hence not as powerful. We won’t cover it in detail, then.

generate方法有另一个变体,只有一个Consumer<SynchronousSink>类型参数。这个变体只适合于产生一个预先确定的序列,因此没有那么强大。那我们就不详细介绍它了。

4. Asynchronous Emission

4.异步发射

Synchronous emission isn’t the only solution to the programmatic creation of a Flux.

同步排放并不是程序化创建Flux的唯一解决方案。

Instead, we can use the create and push operators to produce multiple items in a round of emission in an asynchronous manner.

相反,我们可以使用createpush操作符,以异步的方式在一轮排放中产生多个项目。

4.1. The create Method

4.1.创建方法

Using the create method, we can produce items from multiple threads. In this example, we’ll collect elements from two different sources into a sequence.

使用create方法,我们可以从多个线程产生项目。在这个例子中,我们将从两个不同的来源收集元素到一个序列。

First, let’s see how create is a little different from generate:

首先,让我们看看creategenerate有什么不同。

public class SequenceCreator {
    public Consumer<List<Integer>> consumer;

    public Flux<Integer> createNumberSequence() {
        return Flux.create(sink -> SequenceCreator.this.consumer = items -> items.forEach(sink::next));
    }
}

Unlike the generate operator, the create method doesn’t maintain a state. And rather than generating items by itself, the emitter passed to this method receives elements from an external source.

generate操作符不同,create方法不保持状态。而且,传递给该方法的发射器不是自己生成项目,而是从外部来源接收元素。

Also, we can see that the create operator asks us for a FluxSink instead of a SynchronousSink. With a FluxSink, we can call next() as many times as we need to.

另外,我们可以看到create操作符要求我们提供FluxSink而不是SynchronousSink。有了FluxSink我们可以根据需要多次调用next()

In our case, we’ll call next() for every item we have in the list of items, emitting each one by one. We’ll see how to populate items in just a moment.

在我们的例子中,我们将为items列表中的每一个项目调用next(),逐一发射。我们将在稍后看到如何填充items

Our external source, in this case, is an imaginary consumer field, though this instead could be some observable API.

在这种情况下,我们的外部来源是一个假想的消费者领域,尽管这可能是一些可观察的API。

Let’s put the create operator into action, starting with two sequences of numbers:

让我们把create操作符付诸行动,从两个数字序列开始。

@Test
public void whenCreatingNumbers_thenSequenceIsProducedAsynchronously() throws InterruptedException {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    List<Integer> sequence1 = sequenceGenerator.generateFibonacciWithTuples().take(3).collectList().block();
    List<Integer> sequence2 = sequenceGenerator.generateFibonacciWithTuples().take(4).collectList().block();

    // other statements described below
}

These sequences, sequence1 and sequence2, will be serving as the sources of items for the generated sequence.

这些序列,sequence1sequence2,将作为生成序列的项目来源。

Next, comes two Thread objects that will pour elements into the publisher:

接下来,是两个Thread对象,它们将把元素倒入发布器。

SequenceCreator sequenceCreator = new SequenceCreator();
Thread producingThread1 = new Thread(
  () -> sequenceCreator.consumer.accept(sequence1)
);
Thread producingThread2 = new Thread(
  () -> sequenceCreator.consumer.accept(sequence2)
);

When the accept operator is called, elements start flowing into the sequence source.

accept操作符被调用时,元素开始流向序列源。

And then, we can listen, or subscribe, to our new, consolidated sequence:

然后,我们可以监听或订阅到我们新的、合并的序列。

List<Integer> consolidated = new ArrayList<>();
sequenceCreator.createNumberSequence().subscribe(consolidated::add);

By subscribing to our sequence, we indicate what should happen with each item emitted by the sequence. Here, it’s to add each item from disparate sources to a consolidated list.

通过订阅我们的序列,我们指出序列发出的每个项目应该发生什么。在这里,它是将来自不同来源的每个项目添加到一个合并的列表中。

Now, we trigger the whole process that sees items moving on two different threads:

现在,我们触发了整个过程,看到项目在两个不同的线程上移动。

producingThread1.start();
producingThread2.start();
producingThread1.join();
producingThread2.join();

As usual, the last step is to verify the operation’s result:

像往常一样,最后一步是验证操作的结果。

assertThat(consolidated).containsExactlyInAnyOrder(0, 1, 1, 0, 1, 1, 2);

The first three numbers in the received sequence come from sequence1, while the last four from sequence2. Due to the nature of asynchronous operations, the order of elements from those sequences isn’t guaranteed.

收到的序列中的前三个数字来自sequence1,而后四个来自sequence2。由于异步操作的性质,来自这些序列的元素的顺序不被保证。

The create method has another variant, taking an argument of type OverflowStrategy. As its name implies, this argument manages back-pressure when the downstream can’t keep up with the publisher. By default, the publisher buffers all elements in such a case.

create方法有另一个变体,接受一个OverflowStrategy类型的参数。顾名思义,当下游无法跟上发布者的速度时,这个参数可以管理反向压力。默认情况下,发布者会在这种情况下缓冲所有的元素。

4.2. The push Method

4.2.push方法

In addition to the create operator, the Flux class has another static method to emit a sequence asynchronously, namely push. This method works just like create, except that it allows only one producing thread to emit signals at a time.

除了create操作符之外,Flux类还有另一个静态方法来异步发射序列,即push。这个方法的工作原理与create一样,但它一次只允许一个生产线程发射信号。

We could replace the create method in the example we’ve just gone through with push, and the code will still compile

我们可以用push替换刚才例子中的create方法,代码仍然可以编译。

However, sometimes we would see an assertion error, as the push operator keeps FluxSink#next from being called concurrently on different threads. As a result, we should use push only if we don’t intend to use multiple threads.

然而,有时我们会看到一个断言错误,因为push操作符使FluxSink#next不会在不同的线程上被并发调用。因此,我们应该只在不打算使用多线程的情况下使用push

5. Handling Sequences

5.处理序列

All the methods we’ve seen so far are static and allow the creation of a sequence from a given source. The Flux API also provides an instance method, named handle, for handling a sequence produced by a publisher.

到目前为止,我们看到的所有方法都是静态的,并允许从一个给定的源创建一个序列。Flux API还提供了一个实例方法,名为handle,用于处理由出版商产生的序列。

This handle operator takes on a sequence, doing some processing and possibly removing some elements. In this regard, we can say the handle operator works just like a map and a filter.

这个handle操作符接收一个序列,做一些处理,并可能删除一些元素。在这方面,我们可以说handle操作符的工作方式就像mapfilter

Let’s take a look at a simple illustration of the handle method:

让我们看一下handle方法的一个简单说明。

public class SequenceHandler {
    public Flux<Integer> handleIntegerSequence(Flux<Integer> sequence) {
        return sequence.handle((number, sink) -> {
            if (number % 2 == 0) {
                sink.next(number / 2);
            }
        });
    }
}

In this example, the handle operator takes a sequence of numbers, dividing the value by 2 if even. In case the value is an odd number, the operator doesn’t do anything, meaning that such a number is ignored.

在这个例子中,handle操作符接收一串数字,如果是偶数,则用2除以该值。如果值是一个奇数,操作者不做任何事情,也就是说,这样的数字被忽略了。

Another thing to notice is that, as with the generate method, handle employs a SynchronousSink and enables one-by-one emissions only.

另一件需要注意的事情是,与generate方法一样,handle采用了SynchronousSink,并且只允许逐一排放。

And finally, we need to test things. Let’s use StepVerifier one last time to confirm that our handler works:

最后,我们需要对事情进行测试。让我们最后一次使用StepVerifier来确认我们的处理程序是否有效。

@Test
public void whenHandlingNumbers_thenSequenceIsMappedAndFiltered() {
    SequenceHandler sequenceHandler = new SequenceHandler();
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    Flux<Integer> sequence = sequenceGenerator.generateFibonacciWithTuples().take(10);

    StepVerifier.create(sequenceHandler.handleIntegerSequence(sequence))
      .expectNext(0, 1, 4, 17)
      .expectComplete()
      .verify();
}

There are four even numbers among the first 10 items in the Fibonacci sequence: 0, 2, 8, and 34, hence the arguments we pass to the expectNext method.

在斐波那契数列的前10项中,有四个偶数。0, 2, 8, 和34,因此我们传递给expectNext方法的参数。

6. Conclusion

6.结论

In this article, we walked through various methods of the Flux API that can be used to produce a sequence in a programmatic way, notably the generate and create operators.

在这篇文章中,我们浏览了Flux API的各种方法,这些方法可用于以编程的方式产生一个序列,特别是generatecreate操作符。

The source code for this tutorial is available over on GitHub. This is a Maven project and should be able to run as-is.

本教程的源代码可在GitHub上找到。这是一个Maven项目,应该可以按原样运行。