1. Overview
1.概述
In this article, we’ll take a look at various ways of combining Publishers in Project Reactor.
在这篇文章中,我们将看看在Project Reactor中结合Publishers的各种方法。
2. Maven Dependencies
2.Maven的依赖性
Let’s set up our example with the Project Reactor dependencies:
让我们用Project Reactor依赖项来设置我们的例子。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.4.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.1.4.RELEASE</version>
<scope>test</scope>
</dependency>
3. Combining Publishers
3.合并出版商
Given a scenario when one has to work with Flux<T> or Mono<T>, there are different ways to combine streams.
当人们不得不使用Flux<T>或Mono<T>工作时,有不同的方法来组合流。
Let’s create a few examples to illustrate the usage of static methods in the Flux<T> class such as concat, concatWith, merge, zip and combineLatest.
让我们创建几个例子来说明Flux<T>类中静态方法的用法,如concat, concatWith, merge, zip和combineLatest.。
Our examples will make use of two publishers of type Flux<Integer>, namely evenNumbers, which is a Flux of Integer and holds a sequence of even numbers starting with 1 (min variable) and limited by 5 (max variable).
我们的例子将使用两个Flux<Integer>类型的发布者,即evenNumbers,它是Integer的Flux,持有一个从1(min变量)开始,以5(max变量)为限的偶数序列。
We’ll create oddNumbers, also a Flux of type Integer of odd numbers:
我们将创建oddNumbers,也是一个Flux类型的Integer的奇数。
Flux<Integer> evenNumbers = Flux
.range(min, max)
.filter(x -> x % 2 == 0); // i.e. 2, 4
Flux<Integer> oddNumbers = Flux
.range(min, max)
.filter(x -> x % 2 > 0); // ie. 1, 3, 5
3.1. concat()
3.1. concat()
The concat method executes a concatenation of the inputs, forwarding elements emitted by the sources downstream.
concat方法执行输入的串联,将来源发出的元素转发到下游。
The concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.
串联是通过依次订阅第一个源,然后等待它完成后再订阅下一个源,以此类推直到最后一个源完成。任何错误都会立即中断该序列,并被转发到下游。
Here is a quick example:
下面是一个快速的例子。
@Test
public void givenFluxes_whenConcatIsInvoked_thenConcat() {
Flux<Integer> fluxOfIntegers = Flux.concat(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
3.2. concatWith()
3.2.concatWith()
Using the static method concatWith, we’ll produce a concatenation of two sources of type Flux<T> as a result:
使用静态方法concatWith,我们将产生两个类型为Flux<T>的源的连接,作为结果。
@Test
public void givenFluxes_whenConcatWithIsInvoked_thenConcatWith() {
Flux<Integer> fluxOfIntegers = evenNumbers.concatWith(oddNumbers);
// same stepVerifier as in the concat example above
}
3.3. combineLatest()
3.3. combineLatest()
The Flux static method combineLatest will generate data provided by the combination of the most recently published value from each of the Publisher sources.
Flux静态方法combineLatest将生成由每个Publisher来源的最新发布值组合提供的数据。
Here’s an example of the usage of this method with two Publisher sources and a BiFunction as parameters:
下面是一个使用这个方法的例子,有两个Publisher源和一个BiFunction作为参数。
@Test
public void givenFluxes_whenCombineLatestIsInvoked_thenCombineLatest() {
Flux<Integer> fluxOfIntegers = Flux.combineLatest(
evenNumbers,
oddNumbers,
(a, b) -> a + b);
StepVerifier.create(fluxOfIntegers)
.expectNext(5) // 4 + 1
.expectNext(7) // 4 + 3
.expectNext(9) // 4 + 5
.expectComplete()
.verify();
}
We can see here that the function combineLatest applied the function “a + b” using the latest element of evenNumbers (4) and the elements of oddNumbers (1,3,5), thus generating the sequence 5,7,9.
我们在这里可以看到,函数combineLatest使用evenNumbers(4)的最新元素和oddNumbers(1,3,5)的元素应用了函数 “a+b”,从而生成了序列5,7,9。
3.4. merge()
3.4.merge()
The merge function executes a merging of the data from Publisher sequences contained in an array into an interleaved merged sequence:
merge函数执行一个数组中包含的Publisher序列的数据合并到一个交错的合并序列。
@Test
public void givenFluxes_whenMergeIsInvoked_thenMerge() {
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
An interesting thing to note is that, opposed to concat (lazy subscription), the sources are subscribed eagerly.
值得注意的是,与concat(lazy subscription)相反,来源被急切地订阅。
Here, we can see a different outcome of the merge function if we insert a delay between the elements of the Publishers:
在这里,如果我们在发布者的元素之间插入一个延迟,我们可以看到merge函数的不同结果。
@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
}
3.5. mergeSequential()
3.5.mergeSequential()
The mergeSequential method merges data from Publisher sequences provided in an array into an ordered merged sequence.
mergeSequential方法将数组中提供的Publisher序列的数据合并成一个有序的合并序列。
Unlike concat, sources are subscribed to eagerly.
与concat不同的是,来源被急切地订阅。
Also, unlike merge, their emitted values are merged into the final sequence in subscription order:
另外,与merge不同的是,它们的发射值会以订阅顺序合并到最终的序列中。
@Test
public void testMergeSequential() {
Flux<Integer> fluxOfIntegers = Flux.mergeSequential(
evenNumbers,
oddNumbers);
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
3.6. mergeDelayError()
3.6.mergeDelayError()
The mergeDelayError merges data from Publisher sequences contained in an array into an interleaved merged sequence.
mergeDelayError将一个数组中包含的Publisher序列的数据合并成一个交错的合并序列。
Unlike concat, sources are subscribed to eagerly.
与concat不同的是,来源被急切地订阅。
This variant of the static merge method will delay any error until after the rest of the merge backlog has been processed.
这个静态merge方法的变体将延迟任何错误,直到其余的合并积压文件被处理之后。
Here is an example of mergeDelayError:
下面是一个mergeDelayError的例子:。
@Test
public void givenFluxes_whenMergeWithDelayedElementsIsInvoked_thenMergeWithDelayedElements() {
Flux<Integer> fluxOfIntegers = Flux.mergeDelayError(1,
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
}
3.7. mergeWith()
3.7. mergeWith()
The static method mergeWith merges data from this Flux and a Publisher into an interleaved merged sequence.
静态方法mergeWith将来自这个Flux和一个Publisher的数据合并成一个交错的合并序列。
Again, unlike concat, inner sources are subscribed to eagerly:
同样,与concat不同的是,内部源被急切地订阅。
@Test
public void givenFluxes_whenMergeWithIsInvoked_thenMergeWith() {
Flux<Integer> fluxOfIntegers = evenNumbers.mergeWith(oddNumbers);
// same StepVerifier as in "3.4. Merge"
StepVerifier.create(fluxOfIntegers)
.expectNext(2)
.expectNext(4)
.expectNext(1)
.expectNext(3)
.expectNext(5)
.expectComplete()
.verify();
}
3.8. zip()
3.8. zip()
The static method zip agglutinates multiple sources together, i.e., waits for all the sources to emit one element and combines these elements into an output value (constructed by the provided combinator function).
静态方法zip将多个源聚集在一起,即等待所有源发出一个元素,并将这些元素组合成一个输出值(由提供的组合器函数构建)。
The operator will continue doing so until any of the sources completes:
操作员将继续这样做,直到任何一个来源完成。
@Test
public void givenFluxes_whenZipIsInvoked_thenZip() {
Flux<Integer> fluxOfIntegers = Flux.zip(
evenNumbers,
oddNumbers,
(a, b) -> a + b);
StepVerifier.create(fluxOfIntegers)
.expectNext(3) // 2 + 1
.expectNext(7) // 4 + 3
.expectComplete()
.verify();
}
As there is no element left from evenNumbers to pair up, the element 5 from oddNumbers Publisher is ignored.
由于evenNumbers中没有剩余的元素可以配对,oddNumbers Publisher中的元素5被忽略了。
3.9. zipWith()
3.9. zipWith()
The zipWith executes the same method that zip does, but only with two Publishers:
zipWith执行与zip相同的方法,但只有两个发布者。
@Test
public void givenFluxes_whenZipWithIsInvoked_thenZipWith() {
Flux<Integer> fluxOfIntegers = evenNumbers
.zipWith(oddNumbers, (a, b) -> a * b);
StepVerifier.create(fluxOfIntegers)
.expectNext(2) // 2 * 1
.expectNext(12) // 4 * 3
.expectComplete()
.verify();
}
4. Conclusion
4.结论
In this quick tutorial, we’ve discovered multiple ways of combining Publishers.
在这个快速教程中,我们发现了结合Publishers.的多种方法。
As always, the examples are available in over on GitHub.
一如既往,这些例子可在GitHub上找到。