1. Introduction
1.介绍
In this quick tutorial, we’ll discuss different ways of combining Observables in RxJava.
在这个快速教程中,我们将讨论在RxJava中组合Observables的不同方法。
If you’re new to RxJava, definitely check out this intro tutorial first.
如果你是RxJava的新手,一定要先看看这个intro tutorial。
Now, let’s jump right in.
现在,让我们直接跳进去。
2. Observables
2.可观察到的
Observable sequences, or simply Observables, are representations of asynchronous data streams.
Observable序列,或简称Observables,是异步数据流的代表。
These’re based on the Observer pattern wherein an object called an Observer, subscribes to items emitted by an Observable.
这些基于Observer 模式,其中称为Observer的对象订阅由Observable发出的项目。
The subscription is non-blocking as the Observer stands to react to whatever the Observable will emit in the future. This, in turn, facilitates concurrency.
由于Observer可以对Observable在未来发出的任何信息做出反应,因此该订阅是非阻塞的。这反过来促进了并发性。
Here’s a simple demonstration in RxJava:
这里有一个RxJava的简单演示。
Observable
.from(new String[] { "John", "Doe" })
.subscribe(name -> System.out.println("Hello " + name))
3. Combining Observables
3.结合观测数据
When programming using a reactive framework, it’s a common use-case to combine various Observables.
在使用反应式框架编程时,将各种Observables结合起来是一个常见的使用情况。
In a web application, for example, we may need to get two sets of asynchronous data streams that are independent of each other.
例如,在一个网络应用中,我们可能需要获得两组相互独立的异步数据流。
Instead of waiting for the previous stream to complete before requesting the next stream, we can call both at the same time and subscribe to the combined streams.
在请求下一个流之前,我们不需要等待前一个流的完成,而是可以同时调用这两个流,并订阅合并后的流。
In this section, we’ll discuss some of the different ways we can combine multiple Observables in RxJava and the different use-cases to which each method applies.
在本节中,我们将讨论在RxJava中组合多个Observables的一些不同方法,以及每种方法所适用的不同用例。
3.1. Merge
3.1.合并
We can use the merge operator to combine the output of multiple Observables so that they act like one:
我们可以使用merge操作符来合并多个Observables的输出,使它们像一个人一样行动。
@Test
public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable.merge(
Observable.from(new String[] {"Hello", "World"}),
Observable.from(new String[] {"I love", "RxJava"})
).subscribe(testSubscriber);
testSubscriber.assertValues("Hello", "World", "I love", "RxJava");
}
3.2. MergeDelayError
3.2.MergeDelayError
The mergeDelayError method is the same as merge in that it combines multiple Observables into one, but if errors occur during the merge, it allows error-free items to continue before propagating the errors:
mergeDelayError方法与merge相同,它将多个Observables合并为一个,但是如果在合并过程中发生错误,它允许在传播错误之前继续无错误的项目。
@Test
public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable.mergeDelayError(
Observable.from(new String[] { "hello", "world" }),
Observable.error(new RuntimeException("Some exception")),
Observable.from(new String[] { "rxjava" })
).subscribe(testSubscriber);
testSubscriber.assertValues("hello", "world", "rxjava");
testSubscriber.assertError(RuntimeException.class);
}
The above example emits all the error-free values:
上面的例子排放了所有无错误的值。
hello
world
rxjava
Note that if we use merge instead of mergeDelayError, the String “rxjava” won’t be emitted because merge immediately stops the flow of data from Observables when an error occurs.
注意,如果我们使用merge而不是mergeDelayError,String “rxjava” 将不会被排放出来,因为merge在错误发生时立即停止来自Observables的数据流。
3.3. Zip
3.3.Zip
The zip extension method brings together two sequences of values as pairs:
zip扩展方法将两个序列的值作为对组合起来。
@Test
public void givenTwoObservables_whenZipped_thenReturnCombinedResults() {
List<String> zippedStrings = new ArrayList<>();
Observable.zip(
Observable.from(new String[] { "Simple", "Moderate", "Complex" }),
Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}),
(str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);
assertThat(zippedStrings).isNotEmpty();
assertThat(zippedStrings.size()).isEqualTo(3);
assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy");
}
3.4. Zip With Interval
3.4.缩小时间间隔
In this example, we will zip a stream with interval which in effect will delay the emission of elements of the first stream:
在这个例子中,我们将用interval来压缩一个流,这实际上将延迟第一个流的元素的排放。
@Test
public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS);
Observable
.zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData))
.toBlocking().subscribe(testSubscriber);
testSubscriber.assertCompleted();
testSubscriber.assertValueCount(5);
testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");
}
4. Summary
4.总结
In this article, we’ve seen a few of the methods for combining Observables with RxJava. You can learn about other methods like combineLatest, join, groupJoin, switchOnNext, in the official RxJava documentation.
在这篇文章中,我们已经看到了将Observables与RxJava相结合的一些方法。您可以了解其他方法,如combineLatest, join, groupJoin,switchOnNext,在official RxJava documentation。
As always, the source code for this article is available in our GitHub repo.
一如既往,本文的源代码可在我们的GitHub repo中找到。