Combining RxJava Completables – 结合RxJava补全表

最后修改: 2018年 10月 2日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll play with RxJava’s Completable type, which represents a computation result without an actual value.

在本教程中,我们将使用RxJava的Completable类型,它表示一个没有实际值的计算结果。

2. RxJava Dependency

2.RxJava的依赖性

Let’s include the RxJava 2 dependency into our Maven project:

让我们把RxJava 2的依赖性纳入我们的Maven项目。

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.2</version>
</dependency>

We can usually find the latest version on Maven Central.

我们通常可以在Maven Central上找到最新版本。

3. Completable Type

3.可完成的类型

Completable is similar to Observable with the only exception that the former emits either completion or error signals but no items. The Completable class contains several convenience methods for creating or obtaining it from different reactive sources.

CompletableObservable相似,唯一的例外是前者会发出完成或错误信号,但没有项目。Completable类包含几个方便的方法,用于创建或从不同反应式源获取它。

We can spawn an instance that completes immediately by using Completable.complete()

我们可以通过使用Completable.complete()催生一个立即完成的实例。

Then, we can observe its state by using DisposableCompletableObserver:

然后,我们可以通过使用DisposableCompletableObserver观察其状态。

Completable
  .complete()
  .subscribe(new DisposableCompletableObserver() {
    @Override
    public void onComplete() {
        System.out.println("Completed!");
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }
});

Additionally, we can construct a Completable instance from Callable, Action, and Runnable:

此外,我们可以从Callable、Action和Runnable构建一个Completable实例。

Completable.fromRunnable(() -> {});

Also, we can get Completable instances from other types using either Completable.from() or calling ignoreElement() on MaybeSingleFlowable, and Observable sources themselves:

另外,我们可以使用Completable.from()或者在MaybeSingleFlowableObservable源本身上调用ignoreElement(),从其他类型中获得Completable实例。

Flowable<String> flowable = Flowable
  .just("request received", "user logged in");
Completable flowableCompletable = Completable
  .fromPublisher(flowable);
Completable singleCompletable = Single.just(1)
  .ignoreElement();

4. Chaining Completables

4.串联完成表

We can employ chaining of Completables in many real-world use cases when we only care about the success of operation:

当我们只关心操作的成功与否时,我们可以在许多现实世界的用例中采用Completables的链式。

  • All-or-nothing actions like doing a PUT request to update a remote object followed by a local database update upon the success
  • Post-factum logging and journaling
  • Orchestration of several actions, e.g. running an analytics job after an ingestion action gets completed

We’ll keep examples simple and problem-agnostic. Consider we have several Completable instances:

我们将保持例子的简单性和问题的无关性。考虑到我们有几个Completable实例。

Completable first = Completable
  .fromSingle(Single.just(1));
Completable second = Completable
  .fromRunnable(() -> {});
Throwable throwable = new RuntimeException();
Completable error = Single.error(throwable)
  .ignoreElement();

To combine two Completables into a single one, we can use the andThen() operator:

为了将两个Completables合并成一个,我们可以使用andThen()操作符

first
  .andThen(second)
  .test()
  .assertComplete();

We can chain as many Completables as needed. At the same time, if at least one of the sources fails to complete, resulting Completable won’t fire onComplete() as well:

我们可以根据需要将多个Completables链接起来。同时,如果至少有一个源没有完成,产生的Completable也不会触发onComplete()

first
  .andThen(second)
  .andThen(error)
  .test()
  .assertError(throwable);

Furthermore, if one of the sources is infinite or doesn’t reach onComplete for some reason, the resulting Completable will never fire onComplete() nor onError() as well.

此外,如果其中一个源是无限的或者由于某种原因没有达到onComplete,那么产生的Completable将永远不会触发onComplete() 或者onError()。

A good thing that we can still test this scenario:

好在我们仍然可以测试这种情况。

...
  .andThen(Completable.never())
  .test()
  .assertNotComplete();

5. Composing Series of Completables

5.组成系列的补足表

Imagine we have a bunch of Completables. As of practical use case, suppose we need to register a user within several separate subsystems.

想象一下,我们有一堆Completables。作为实际用例,假设我们需要在几个独立的子系统中注册一个用户。

To join all Completables into a single one, we can use the merge() family of methods. The merge() operator allows subscribing to all sources.

为了将所有的Completables合并成一个,我们可以使用merge()系列的方法。merge()操作符允许订阅所有来源。

The resulting instance completes once all of the sources are completed. Additionally, it terminates with onError when any of the sources emits an error:

一旦所有的源都完成,产生的实例就会完成。此外,当任何一个源发出错误时,它将以onError终止。

Completable.mergeArray(first, second)
  .test()
  .assertComplete();

Completable.mergeArray(first, second, error)
  .test()
  .assertError(throwable);

Let’s move on to a slightly different use case. Let’s say we need to execute an action for every element obtained from a Flowable.

让我们继续讨论一个稍微不同的用例。假设我们需要为从Flowable中获得的每个元素执行一个动作,

Then, we want a single Completable for both the completion of the upstream and all the element-level actions. The flatMapCompletable() operator comes to help in this case:

然后,我们想要一个单一的Completable来完成上游和所有元素级的操作。在这种情况下,flatMapCompletable()操作符就来帮忙。

Completable allElementsCompletable = Flowable
  .just("request received", "user logged in")
  .flatMapCompletable(message -> Completable
      .fromRunnable(() -> System.out.println(message))
  );
allElementsCompletable
  .test()
  .assertComplete();

Similarly, the above method is available for the rest of the base reactive classes like ObservableMaybe, or Single.

类似地,上述方法也可用于其他基础反应式类,如ObservableMaybeSingle。

As a practical context for flatMapCompletable(), we could think about decorating every item with some side effect. We can write a log entry per completed element or make a storage snapshot upon each successful action.

作为flatMapCompletable()的实际背景,我们可以考虑用一些副作用来装饰每个项目。我们可以为每个完成的元素写一条日志,或者在每个成功的动作后制作一个存储快照。

Finally, we may need to construct a Completable from a couple of other sources and get it terminated as soon as either of them completes. That’s where amb operators can help.

最后,我们可能需要从其他几个来源构建一个Completable,并在其中任何一个来源完成后立即让它终止。这就是amb操作符可以提供帮助的地方。

The amb prefix is a short-hand for “ambiguous”, implying the uncertainty about which Completable exactly gets completed. For example, ambArray():

amb前缀是 “模棱两可 “的简称,意味着不确定哪一个Completable到底被完成。例如,ambArray()

Completable.ambArray(first, Completable.never(), second)
  .test()
  .assertComplete();

Note, that the above Completable may also terminate with onError() instead of onComplete() depending on which source completable terminates first:

注意,上述Completable也可能以onError()而不是onComplete()来终止,这取决于哪个源的completable先终止。

Completable.ambArray(error, first, second)
  .test()
  .assertError(throwable);

Also, once the first source terminates, the remaining sources are guaranteed to be disposed of.

另外,一旦第一个来源终止,其余的来源就会被保证处理掉。

That means all remaining running Completables are stopped via Disposable.dispose() and corresponding CompletableObservers will be unsubscribed.

这意味着所有剩余的正在运行的Completables将通过Disposable.dispose()停止,相应的CompletableObservers将被取消订阅。

Concerning a practical example, we can use amb() when we stream a backup file to a several equivalents remote storages. And we complete the process once the first-best backup finishes or repeat the process upon error.

关于一个实际的例子,我们可以使用amb(),当我们把一个备份文件流向几个相等的远程存储时。一旦第一个最好的备份完成,我们就完成这个过程,或者在出错时重复这个过程。

6. Conclusion

6.结论

In this article, we briefly reviewed the Completable type of RxJava.

在这篇文章中,我们简要地回顾了RxJava的Completable类型。

We started with different options for obtaining Completable instances and then chained and composed Completables by using the andThen(), merge(), flatMapCompletable(), and amb…() operators.

我们从获得Completable实例的不同选项开始,然后通过使用andThen()、merge()、flatMapCompletable()amb…()操作符来链结和组成Completables

We can find the source for all code samples over on GitHub.

我们可以在GitHub上找到所有代码样本的源头over