RxJava and Error Handling – RxJava和错误处理

最后修改: 2017年 9月 12日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.引言

In this article, we’ll take a look at how to handle exceptions and errors using RxJava.

在这篇文章中,我们将看看如何使用RxJava来处理异常和错误。

First, keep in mind that the Observable typically does not throw exceptions. Instead, by default, Observable invokes its Observer’s onError() method, notifying the observer that an unrecoverable error just occurred, and then quits without invoking any more of its Observer’s methods.

首先,请记住,Observable通常不会抛出异常。相反,默认情况下,Observable调用其Observer的onError()方法,通知观察者刚刚发生了不可恢复的错误,然后退出,不再调用任何其Observer的方法。

The error handling operators we are about to introduce change the default behavior by resuming or retrying the Observable sequence.

我们即将引入的错误处理操作符通过恢复或重试Observable序列来改变默认行为。

2. Maven Dependencies

2.Maven的依赖性

First, let’s add the RxJava in the pom.xml:

首先,让我们在pom.xml中添加RxJava。

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.1.3</version>
</dependency>

The latest version of the artifact can be found here.

最新版本的工件可以在这里找到。

3. Error Handling

3.错误处理

When an error occurs, we usually need to handle it in some way. For example, alter related external states, resuming the sequence with default results, or simply leave it be so that the error could propagate.

当错误发生时,我们通常需要以某种方式处理它。例如,改变相关的外部状态,用默认的结果恢复序列,或者干脆不去管它,这样错误就可以传播了。

3.1. Action on Error

3.1.对错误的行动

With doOnError, we can invoke whatever action that is needed when there is an error:

通过doOnError,我们可以在出现错误时调用任何需要的操作。

@Test
public void whenChangeStateOnError_thenErrorThrown() {
    TestObserver testObserver = new TestObserver();
    AtomicBoolean state = new AtomicBoolean(false);
    Observable
      .error(UNKNOWN_ERROR)
      .doOnError(throwable -> state.set(true))
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
 
    assertTrue("state should be changed", state.get());
}

In case of an exception being thrown while performing the action, RxJava wraps the exception in a CompositeException:

如果在执行动作时抛出了一个异常,RxJava会将该异常包裹在一个CompositeException中。

@Test
public void whenExceptionOccurOnError_thenCompositeExceptionThrown() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .doOnError(throwable -> {
          throw new RuntimeException("unexcepted");
      })
      .subscribe(testObserver);

    testObserver.assertError(CompositeException.class);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
}

3.2. Resume With Default Items

3.2.使用默认项目恢复工作

Though we can invoke actions with doOnError, but the error still breaks the standard sequence flow. Sometimes we want to resume the sequence with a default option, that’s what onErrorReturnItem does:

虽然我们可以用doOnError来调用动作,但错误仍然会破坏标准的序列流程。有时我们想用一个默认选项来恢复序列,这就是onErrorReturnItem的作用。

@Test
public void whenHandleOnErrorResumeItem_thenResumed(){
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorReturnItem("singleValue")
      .subscribe(testObserver);
 
    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(1);
    testObserver.assertValue("singleValue");
}

If a dynamic default item supplier is preferred, we can use the onErrorReturn:

如果喜欢一个动态的默认项目供应商,我们可以使用onErrorReturn

@Test
public void whenHandleOnErrorReturn_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorReturn(Throwable::getMessage)
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(1);
    testObserver.assertValue("unknown error");
}

3.3. Resume with Another Sequence

3.3.用另一个序列恢复工作

Instead of falling back to a single item, we may supply fallback data sequence using onErrorResumeNext when encountering errors. This would help prevent error propagation:

在遇到错误时,我们可以使用onErrorResumeNext来提供回退数据序列,而不是回退到一个单项。这将有助于防止错误的传播。

@Test
public void whenHandleOnErrorResume_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorResumeNext(Observable.just("one", "two"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(2);
    testObserver.assertValues("one", "two");
}

If the fallback sequence differs according to the specific exception types, or the sequence needs to be generated by a function, we can pass the function to the onErrorResumeNext:

如果回退序列根据具体的异常类型而不同,或者该序列需要由一个函数生成,我们可以将该函数传递给onErrorResumeNext:

@Test
public void whenHandleOnErrorResumeFunc_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorResumeNext(throwable -> Observable
        .just(throwable.getMessage(), "nextValue"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(2);
    testObserver.assertValues("unknown error", "nextValue");
}

3.4. Handle Exception Only

3.4.处理异常 Only

RxJava also provides a fallback method that allows continuing the sequence with a provided Observable when an exception (but no error) is raised:

RxJava还提供了一个回退方法,当出现异常(但没有错误)时,允许用所提供的Observable继续序列。

@Test
public void whenHandleOnException_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_EXCEPTION)
      .onExceptionResumeNext(Observable.just("exceptionResumed"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(1);
    testObserver.assertValue("exceptionResumed");
}

@Test
public void whenHandleOnException_thenNotResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onExceptionResumeNext(Observable.just("exceptionResumed"))
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
}

As the code above shows, when an error does occur, the onExceptionResumeNext won’t kick in to resume the sequence.

正如上面的代码所示,当错误发生时,onExceptionResumeNext不会启动以恢复序列。

4. Retry on Error

4.错误时重试

The normal sequence may be broken by a temporary system failure or backend error. In these situations, we want to retry and wait until the sequence is fixed.

正常的序列可能被临时的系统故障或后台错误所打破。在这些情况下,我们要重试并等待,直到序列被修复。

Luckily, RxJava gives us options to perform exactly that.

幸运的是,RxJava为我们提供了一些选项来执行这一任务。

4.1. Retry

4.1.重试

By using retry, the Observable will be re-subscribed infinite times until when there’s no error. But most of the time we would prefer a fixed amount of retries:

通过使用retryObservable将被无限次地重新订阅,直到没有错误时。但大多数时候,我们希望有一个固定的重试次数。

@Test
public void whenRetryOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
          atomicCounter.incrementAndGet();
          return UNKNOWN_ERROR;
      })
      .retry(1)
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
    assertTrue("should try twice", atomicCounter.get() == 2);
}

4.2. Retry on Condition

4.2.根据条件重试

Conditional retry is also feasible in RxJava, using retry with predicates or using retryUntil:

在RxJava中,使用带有谓词的重试或使用retryUntil,有条件重试也是可行的。

@Test
public void whenRetryConditionallyOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
          atomicCounter.incrementAndGet();
          return UNKNOWN_ERROR;
      })
      .retry((integer, throwable) -> integer < 4)
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
    assertTrue("should call 4 times", atomicCounter.get() == 4);
}

@Test
public void whenRetryUntilOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(UNKNOWN_ERROR)
      .retryUntil(() -> atomicCounter.incrementAndGet() > 3)
      .subscribe(testObserver);
    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
    assertTrue("should call 4 times", atomicCounter.get() == 4);
}

4.3. RetryWhen

4.3.RetryWhen

Beyond these basics options, there’s also an interesting retry method: retryWhen.

除了这些基本的选项,还有一个有趣的重试方法。retryWhen

This returns an Observable, say “NewO”, that emits the same values as the source ObservableSource, say “OldO”, but if the returned Observable “NewO” calls onComplete or onError, the subscriber’s onComplete or onError will be invoked.

这将返回一个Observable,例如“NewO”,,它发出的值与源ObservableSource,例如“OldO”,但是如果返回的Observable“NewO “调用onCompleteonError,订阅者的onCompleteonError将被调用。

And if “NewO” emits any item, a re-subscription to the source ObservableSource “OldO” will be triggered.

而如果“NewO”发出任何项目,将触发对源ObservableSource “OldO”的重新订阅。

The tests below shows how this works:

下面的测试显示了这是如何工作的。

@Test
public void whenRetryWhenOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    Exception noretryException = new Exception("don't retry");
    Observable
      .error(UNKNOWN_ERROR)
      .retryWhen(throwableObservable -> Observable.error(noretryException))
      .subscribe(testObserver);

    testObserver.assertError(noretryException);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
}

@Test
public void whenRetryWhenOnError_thenCompleted() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
        atomicCounter.incrementAndGet();
        return UNKNOWN_ERROR;
      })
      .retryWhen(throwableObservable -> Observable.empty())
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertNoValues();
    assertTrue("should not retry", atomicCounter.get()==0);
}

@Test
public void whenRetryWhenOnError_thenResubscribed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
        atomicCounter.incrementAndGet();
        return UNKNOWN_ERROR;
      })
      .retryWhen(throwableObservable -> Observable.just("anything"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertNoValues();
    assertTrue("should retry once", atomicCounter.get()==1);
}

A typical usage of retryWhen is limited retries with variable delays:

retryWhen的一个典型用法是有限的重试和可变的延迟。

@Test
public void whenRetryWhenForMultipleTimesOnError_thenResumed() {
    TestObserver testObserver = new TestObserver();
    long before = System.currentTimeMillis();
    Observable
      .error(UNKNOWN_ERROR)
      .retryWhen(throwableObservable -> throwableObservable
        .zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
        .flatMap(integer -> Observable.timer(integer, TimeUnit.SECONDS)))
      .blockingSubscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertNoValues();
    long secondsElapsed = (System.currentTimeMillis() - before)/1000;
    assertTrue("6 seconds should elapse",secondsElapsed == 6 );
}

Notice how this logic retries three times and incrementally delays each retry.

注意这个逻辑是如何重试三次,并且每次重试的时间都是递增的。

5. Summary

5.总结

In this article, we introduced a number of ways of handling errors and exceptions in RxJava.

在这篇文章中,我们介绍了一些在RxJava中处理错误和异常的方法。

There are also several RxJava-specific exceptions relating to error handling – have a look at the official wiki for more details.

还有几个与错误处理有关的RxJava特定的异常 – 请看官方维基以了解更多细节。

As always, the full implementation can be found over on Github.

一如既往,完整的实现可以在Github上找到