RxJava Hooks – RxJava 钩子

最后修改: 2019年 3月 16日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’re going to learn about RxJava hooks. We’ll be creating short examples to demonstrate how the hooks work in different situations.

在本教程中,我们将学习RxJava钩子的知识。我们将创建简短的例子来演示钩子在不同情况下的工作原理。

2. What Are RxJava Hooks?

2.什么是RxJava Hooks?

As the name depicts, RxJava hooks allow us to hook into the lifecycle of Observable, Completable, Maybe, Flowable, and Single. In addition, RxJava allows us to add lifecycle hooks to the schedulers returned by Schedulers. Furthermore, we can specify a global error handler also using the hooks.

正如其名,RxJava钩子允许我们钩住Observable、CompletableMaybeFlowable、>Single的生命周期。此外,RxJava允许我们将生命周期钩子添加到由Schedulers.返回的调度器中。此外,我们也可以使用钩子指定一个全局错误处理程序。

In RxJava 1, the class RxJavaHooks is used to define the hooks. But, the hooking mechanism is completely re-written in RxJava 2. Now the class RxJavaHooks is no longer available to define the hooks. Instead, we should use RxJavaPlugins to implement the lifecycle hooks.

在RxJava 1中,RxJavaHooks类被用来定义钩子。但是,在RxJava 2中,钩子机制被完全重写了。现在,RxJavaHooks类已经不能用来定义钩子了。相反,我们应该使用RxJavaPlugins来实现生命周期挂钩

The RxJavaPlugins class has a number of setter methods to set the hooks. These hooks are global. Once they’re set, then we have to either call the reset() method of the RxJavaPlugins class, or call the setter method for the individual hook to remove it.

RxJavaPlugins类有许多setter方法来设置钩子。这些钩子是全局的。一旦它们被设置,那么我们就必须调用RxJavaPlugins类的reset()方法,或者调用单个钩子的setter方法来移除它。

3. Hook for Error Handling

3.错误处理的钩子

We can use the setErrorHandler( ) method to handle the errors that can’t be emitted because the downstream’s lifecycle already reached its terminal state. Let’s see how we can implement an error handler and test it:

我们可以使用setErrorHandler()方法来处理那些因为下游的生命周期已经达到终点状态而无法发出的错误。让我们看看如何实现一个错误处理程序并对其进行测试。

RxJavaPlugins.setErrorHandler(throwable -> {
    hookCalled = true;
});

Observable.error(new IllegalStateException()).subscribe();

assertTrue(hookCalled);

Not all the exceptions are thrown as-is. However, RxJava will check if the thrown error is one of the already named bug cases that should pass through as-is, otherwise it will be wrapped into an UndeliverableException. The exceptions that are named as bug cases are:

并非所有的异常都会被原封不动地抛出。然而,RxJava会检查所抛出的错误是否是已经命名的错误案例之一,这些错误应该原封不动地通过,否则它将被包装成一个UndeliverableException。被命名为bug case的异常有:。

  • OnErrorNotImplementedException – when user forgets to add the onError handler in subscribe() method
  • MissingBackpressureException – due to either an operator bug or concurrent onNext
  • IllegalStateException – when general protocol violations occur
  • NullPointerException – standard null pointer exception
  • IllegalArgumentException – due to invalid user input
  • CompositeException – due to a crash while handling an exception

4. Hooks for Completable

4.用于Completable的钩子

The RxJava Completable has two lifecycle hooks. Let’s take a look at them now.

RxJava Completable有两个生命周期挂钩。现在让我们来看看它们。

4.1. setOnCompletableAssembly

4.1.setOnCompletableAssembly

RxJava will call this hook when it instantiates operators and sources on Completable. We can use the current Completable object, provided as an argument to the hook function, for any operation on it:

RxJava在Completable上实例化操作者和源时,会调用这个钩子。我们可以使用当前的Completable对象,作为参数提供给钩子函数,对其进行任何操作。

RxJavaPlugins.setOnCompletableAssembly(completable -> {
    hookCalled = true;
    return completable;
});

Completable.fromSingle(Single.just(1));

assertTrue(hookCalled);

4.2. setOnCompletableSubscribe

4.2.setOnCompletableSubscribe

RxJava calls this hook before a subscriber subscribes to a Completable:

RxJava在订阅者订阅Completable之前调用这个钩子。

RxJavaPlugins.setOnCompletableSubscribe((completable, observer) -> {
    hookCalled = true;
    return observer;
});

Completable.fromSingle(Single.just(1)).test();

assertTrue(hookCalled);

5. Hooks for Observable

5.用于Observable的钩子

Next, let’s take a look at the RxJava’s three lifecycle hooks for Observable.

接下来,让我们看看RxJava为Observable设置的三个生命周期钩子。

5.1. setOnObservableAssembly

5.1.setOnObservableAssembly

RxJava calls this hook when it instantiates operators and sources on Observable:

RxJava在Observable上实例化操作者和源时,会调用这个钩子。

RxJavaPlugins.setOnObservableAssembly(observable -> {
    hookCalled = true;
    return observable;
});

Observable.range(1, 10);

assertTrue(hookCalled);

5.2. setOnObservableSubscribe

5.2.setOnObservableSubscribe

RxJava calls this hook before a subscriber subscribes to an Observable:

RxJava在订阅者订阅Observable之前调用这个钩子。

RxJavaPlugins.setOnObservableSubscribe((observable, observer) -> {
    hookCalled = true;
    return observer;
});

Observable.range(1, 10).test();

assertTrue(hookCalled);

5.3. setOnConnectableObservableAssembly

5.3.setOnConnectableObservableAssembly

This hook is intended for the ConnectableObservable. A ConnectableObservable is a variant of an Observable itself. The only difference is that it does not begin emitting items when it is subscribed to, but only when its connect() method is called:

这个钩子是为 ConnectableObservable 准备的。ConnectableObservableObservable本身的一个变体。唯一的区别是,它不会在被订阅时开始发射项目,而只是在其 connect() 方法被调用时才开始发射。

RxJavaPlugins.setOnConnectableObservableAssembly(connectableObservable -> {
    hookCalled = true;
    return connectableObservable;
});

ConnectableObservable.range(1, 10).publish().connect();

assertTrue(hookCalled);

6. Hooks for Flowable

6.用于Flowable的钩子

Now, let’s take a look at the lifecycle hooks defined for Flowable.

现在,让我们看看为Flowable定义的生命周期挂钩。

6.1. setOnFlowableAssembly

6.1.setOnFlowableAssembly

RxJava calls this hook when it instantiates operators and sources on Flowable:

RxJava在Flowable上实例化操作符和源时,会调用这个钩子。

RxJavaPlugins.setOnFlowableAssembly(flowable -> {
    hookCalled = true;
    return flowable;
});

Flowable.range(1, 10);

assertTrue(hookCalled);

6.2. setOnFlowableSubscribe

6.2.setOnFlowableSubscribe

RxJava calls this hook before a subscriber subscribes to Flowable:

RxJava在订阅者订阅Flowable之前调用这个钩子。

RxJavaPlugins.setOnFlowableSubscribe((flowable, observer) -> {
    hookCalled = true;
    return observer;
});

Flowable.range(1, 10).test();

assertTrue(hookCalled);

6.3. setOnConnectableFlowableAssembly

6.3.setOnConnectableFlowableAssembly

RxJava calls this hook when it instantiates operators and sources on ConnectableFlowable. Like the ConnectableObservable, ConnectableFlowable also begins emitting items only when we call its connect() method:

RxJava在ConnectableFlowable上实例化操作符和源时,会调用这个钩。像ConnectableObservable一样,ConnectableFlowable也只有在我们调用其connect()方法时才开始发射项目。

RxJavaPlugins.setOnConnectableFlowableAssembly(connectableFlowable -> {
    hookCalled = true;
    return connectableFlowable;
});

ConnectableFlowable.range(1, 10).publish().connect();

assertTrue(hookCalled);

6.4. setOnParallelAssembly

6.4.setOnParallelAssembly

A ParallelFlowable is for achieving parallelism among multiple publishers. RxJava calls the setOnParallelAssembly() hook when it  instantiates operators and sources on ParallelFlowable:

ParallelFlowable是为了实现多个发布者之间的并行性。RxJava在ParallelFlowable上实例化操作符和源时,会调用setOnParallelAssembly()钩。

RxJavaPlugins.setOnParallelAssembly(parallelFlowable -> {
    hookCalled = true;
    return parallelFlowable;
});

Flowable.range(1, 10).parallel();

assertTrue(hookCalled);

7. Hooks for Maybe

7、Maybe的钩子

The Maybe emitter has two hooks defined to control its lifecycle.

Maybe发射器定义了两个钩子来控制其生命周期。

7.1. setOnMaybeAssembly

7.1.setOnMaybeAssembly

RxJava calls this hook when it instantiates operators and sources on Maybe:

RxJava在Maybe上实例化操作者和源时,会调用这个钩子。

RxJavaPlugins.setOnMaybeAssembly(maybe -> {
    hookCalled = true;
    return maybe;
});

Maybe.just(1);

assertTrue(hookCalled);

7.2. setOnMaybeSubscribe

7.2.setOnMaybeSubscribe

RxJava calls this hook before a subscriber subscribes to Maybe:

RxJava在订阅者订阅Maybe之前调用这个钩子。

RxJavaPlugins.setOnMaybeSubscribe((maybe, observer) -> {
    hookCalled = true;
    return observer;
});

Maybe.just(1).test();

assertTrue(hookCalled);

8. Hooks for Single

8.用于单一的钩子

RxJava defines the basic two hooks for Single emitter as well.

RxJava也为Single发射器定义了两个基本钩子。

8.1. setOnSingleAssembly

8.1.setOnSingleAssembly

RxJava calls this hook when it instantiates operators and sources on Single:

RxJava在Single上实例化操作者和源时,会调用这个钩子。

RxJavaPlugins.setOnSingleAssembly(single -> {
    hookCalled = true;
    return single;
});

Single.just(1);

assertTrue(hookCalled);

8.2. setOnSingleSubscribe

8.2.setOnSingleSubscribe

RxJava calls this hook before a subscriber subscribes to Single:

RxJava在订阅者订阅Single之前调用这个钩子。

RxJavaPlugins.setOnSingleSubscribe((single, observer) -> {
    hookCalled = true;
    return observer;
});

Single.just(1).test();

assertTrue(hookCalled);

9. Hooks for Schedulers

9.钩子为调度员

Like the RxJava emitters, Schedulers also has a bunch of hooks to control their lifecycle. RxJava defines a common hook that gets called when we use any type of Schedulers. In addition, it’s possible to implement hooks that are specific to various Schedulers.

像RxJava发射器一样,Schedulers也有一堆钩子来控制其生命周期。RxJava定义了一个通用的钩子,当我们使用任何类型的Schedulers时都会被调用。此外,还可以实现针对各种Schedulers的钩子。

9.1. setScheduleHandler

9.1.setScheduleHandler

RxJava calls this hook when we use any of the schedulers for an operation:

当我们使用任何一个调度器进行操作时,RxJava会调用这个钩子。

RxJavaPlugins.setScheduleHandler((runnable) -> {
    hookCalled = true;
    return runnable;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.single())
  .test();

hookCalled = false;

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.computation())
  .test();

assertTrue(hookCalled);

Since we’ve repeated the operation with both single() and computation() schedulers, when we run this, the test case will print the message twice in the console.

由于我们用single()computation()调度器重复操作,当我们运行时,测试用例将在控制台中打印两次信息。

9.2. Hooks for Computation Scheduler

9.2.用于计算调度器的钩子

The computation scheduler has two hooks – namely, setInitComputationSchedulerHandler and setComputationSchedulerHandler.

计算调度器有两个钩子,即setInitComputationSchedulerHandlersetComputationSchedulerHandler。

When RxJava initializes a computation scheduler, it calls the hook we set using the setInitComputationSchedulerHandler function. And, furthermore, it calls the hook we set using the setComputationSchedulerHandler function when we schedule a task with Schedulers.computation():

当RxJava初始化一个计算调度器时,它会调用我们用setInitComputationSchedulerHandler函数设置的钩子。此外,当我们用Schedulers.computation()调度一个任务时,它也会调用我们用setComputationSchedulerHandler函数设置的钩子。

RxJavaPlugins.setInitComputationSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setComputationSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.computation())
  .test();

assertTrue(hookCalled && initHookCalled);

9.3. Hooks for IO Scheduler

9.3.用于IO Scheduler的钩子

The IO scheduler also has two hooks – namely, setInitIoSchedulerHandler and setIoSchedulerHandler:

IO调度器也有两个钩子 – 即setInitIoSchedulerHandlersetIoSchedulerHandler

RxJavaPlugins.setInitIoSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setIoSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.io())
  .test();

assertTrue(hookCalled && initHookCalled);

9.4. Hooks for Single Scheduler

9.4.用于Single Scheduler的钩子

Now, let’s see the hooks for Single scheduler:

现在,让我们看看Single调度器的钩子。

RxJavaPlugins.setInitSingleSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setSingleSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 10)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.single())
  .test();

assertTrue(hookCalled && initHookCalled);

9.5. Hooks for NewThread Scheduler

9.5.用于NewThread Scheduler的钩子

Like other schedulers, NewThread scheduler also defines two hooks:

和其他调度器一样,NewThread调度器也定义了两个钩子。

RxJavaPlugins.setInitNewThreadSchedulerHandler((scheduler) -> {
    initHookCalled = true;
    return scheduler.call();
});

RxJavaPlugins.setNewThreadSchedulerHandler((scheduler) -> {
    hookCalled = true;
    return scheduler;
});

Observable.range(1, 15)
  .map(v -> v * 2)
  .subscribeOn(Schedulers.newThread())
  .test();

assertTrue(hookCalled && initHookCalled);

10. Conclusion

10.结论

In this tutorial, we’ve learned what are the various RxJava lifecycle hooks and how can we implement them. Among these hooks, the error handling hook is the most noteworthy one. But, we can use others for auditing purposes, like logging the number of subscribers and other specific use cases.

在本教程中,我们已经了解了什么是各种RxJava生命周期钩子以及如何实现它们。在这些钩子中,错误处理钩子是最值得注意的一个。但是,我们可以将其他钩子用于审计目的,如记录订阅者的数量和其他特定的用例。

And, as usual, all of the short examples we’ve discussed here can be found over on GitHub.

而且,像往常一样,我们在这里讨论的所有简短的例子都可以在GitHub上找到over