How to Test RxJava? – 如何测试RxJava?

最后修改: 2017年 2月 12日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we’ll be looking at ways of testing code written using RxJava.

在这篇文章中,我们将探讨测试使用RxJava编写的代码的方法。

The typical flow we are creating with RxJava consists of an Observable and an Observer. The observable is a source of a data that is a sequence of elements. One or more observers subscribe to it to receive emitted events.

我们用RxJava创建的典型流程包括一个Observable和一个Observer。观察者是一个数据源,是一个元素的序列。一个或多个观察者订阅它以接收发射的事件。

Typically, the observer and observables are executed in separate threads in an asynchronous fashion – that makes the code hard to test in a traditional way.

通常情况下,观察者和可观察变量是以异步方式在不同的线程中执行的–这使得代码很难以传统方式进行测试。

Fortunately, RxJava provides a TestSubscriber class which gives us the ability to test asynchronous, event-driven flow.

幸运的是,RxJava提供了一个TestSubscriber类,使我们能够测试异步的、事件驱动的流程。

2. Testing RxJava – the Traditional Way

2.测试RxJava–传统的方式

Let’s start with an example – we have a sequence of letters that we want to zip with a sequence of integers from 1 inclusive.

让我们从一个例子开始–我们有一个字母序列,我们想用从1开始的整数序列来压缩。

Our test should assert that a subscriber that listens to events emitted by zipped observable receives letters zipped with integers.

我们的测试应该断言,监听由被压缩的观察者发出的事件的订阅者会收到被压缩为整数的信件。

Writing such a test in a traditional way means that we need to keep a list of results and update that list from an observer. Adding elements to a list of integers means that our observable and observers need to work in the same thread – they cannot work asynchronously.

以传统的方式编写这样的测试意味着我们需要保留一个结果列表,并从一个观察者那里更新这个列表。向整数列表中添加元素意味着我们的观察者和观察者需要在同一个线程中工作–他们不能异步工作。

And so we would be missing one of the biggest advantages of RxJava – processing of events in separate threads.

因此,我们将错过RxJava的最大优势之一–在独立的线程中处理事件。

Here’s what that limited version of the test would look like:

下面是那个有限版本的测试会是什么样子。

List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
List<String> results = new ArrayList<>();
Observable<String> observable = Observable
  .from(letters)
  .zipWith(
     Observable.range(1, Integer.MAX_VALUE), 
     (string, index) -> index + "-" + string);

observable.subscribe(results::add);

assertThat(results, notNullValue());
assertThat(results, hasSize(5));
assertThat(results, hasItems("1-A", "2-B", "3-C", "4-D", "5-E"));

We’re aggregating results from an observer by adding elements to a results list. The observer and the observable work in the same thread so our assertion blocks properly and waits for a subscribe() method to finish.

我们通过向results列表添加元素来聚合来自观察者的结果。观察者和可观察者在同一个线程中工作,所以我们的断言正确地阻塞并等待subscribe() 方法的完成。

3. Testing RxJava Using a TestSubscriber

3.使用TestSubscriber测试RxJava

RxJava comes with a TestSubsriber class that allows us to write tests that work with an asynchronous processing of events. This is a normal observer that subscribes to the observable.

RxJava带有一个TestSubsriber类,它允许我们编写以异步处理事件的方式工作的测试。这是一个普通的观察者,它订阅了可观察的事件。

In a test, we can examine the state of a TestSubscriber and make assertions on that state:

在测试中,我们可以检查TestSubscriber的状态,并对该状态做出断言。

List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestSubscriber<String> subscriber = new TestSubscriber<>();

Observable<String> observable = Observable
  .from(letters)
  .zipWith(
    Observable.range(1, Integer.MAX_VALUE), 
    ((string, index) -> index + "-" + string));

observable.subscribe(subscriber);

subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(5);
assertThat(
  subscriber.getOnNextEvents(),
  hasItems("1-A", "2-B", "3-C", "4-D", "5-E"));

We are passing a TestSubscriber instance to a subscribe() method on the observable. Then we can examine the state of this subscriber.

我们将一个TestSubscriber实例传递给观察器上的subscribe()方法。然后我们可以检查这个订阅者的状态。

TestSubscriber has some very useful assertion methods that we’ll use to validate our expectations. The subscriber should receive 5 emitted elements by an observer and we assert that by calling the assertValueCount() method.

TestSubscriber有一些非常有用的断言方法,我们将用它来验证我们的期望。订阅者应该收到由观察者发出的5个元素,我们通过调用assertValueCount()方法来断言。

We can examine all events that a subscriber received by calling the getOnNextEvents() method.

我们可以通过调用getOnNextEvents()方法来检查一个订阅者收到的所有事件。

Calling the assertCompleted() method checks if a stream to which the observer is subscribed to is completed. The assertNoErrors() method asserts that there were no errors while subscribing to a stream.

调用assertCompleted()方法可以检查观察者订阅的流是否已经完成。assertNoErrors()方法断言在订阅一个流时没有错误。

4. Testing Expected Exceptions

4.测试预期的异常情况

Sometimes in our processing, when an observable is emitting events or an observer is processing events, an error occurs. The TestSubscriber has a special method for examining error state – the assertError() method that takes the type of an exception as an argument:

在我们的处理过程中,有时当一个观察者正在发射事件或观察者正在处理事件时,会发生一个错误。TestSubscriber有一个特殊的方法来检查错误状态–assertError() 方法,它把异常的类型作为参数。

List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestSubscriber<String> subscriber = new TestSubscriber<>();

Observable<String> observable = Observable
  .from(letters)
  .zipWith(Observable.range(1, Integer.MAX_VALUE), ((string, index) -> index + "-" + string))
  .concatWith(Observable.error(new RuntimeException("error in Observable")));

observable.subscribe(subscriber);

subscriber.assertError(RuntimeException.class);
subscriber.assertNotCompleted();

We are creating the observable that is joined with another observable using the concatWith() method. The second observable throws a RuntimeException while emitting next event. We can examine a type of that exception on a TestSubsciber by calling the assertError() method.

我们正在创建一个观测器,该观测器使用concatWith()方法与另一个观测器连接。第二个观察变量在发出下一个事件时抛出了一个RuntimeException。我们可以通过调用assertError()方法在TestSubsciber上检查该异常的类型。

The observer that receives an error ceases processing and ends up in a non-completed state. That state can be checked by the assertNotCompleted() method.

收到错误的观察者会停止处理,最终处于非完成状态。这个状态可以通过assertNotCompleted()方法来检查。

5. Testing Time-Based Observable

5.测试基于时间的可观测

Let’s say that we have an Observable that emits one event per second and we want to test that behavior with a TestSubsciber.

假设我们有一个Observable,每秒发出一个事件,我们想用TestSubsciber来测试这个行为。

We can define a time-based Observable using the Observable.interval() method and pass a TimeUnit as an argument:

我们可以使用Observable.interval() 方法定义一个基于时间的Observable,并传递一个TimeUnit作为参数。

List<String> letters = Arrays.asList("A", "B", "C", "D", "E");
TestScheduler scheduler = new TestScheduler();
TestSubscriber<String> subscriber = new TestSubscriber<>();
Observable<Long> tick = Observable.interval(1, TimeUnit.SECONDS, scheduler);

Observable<String> observable = Observable.from(letters)
  .zipWith(tick, (string, index) -> index + "-" + string);

observable.subscribeOn(scheduler)
  .subscribe(subscriber);

The tick observable will emit a new value every one second.

tick 观察器将每隔一秒钟发出一个新的值。

At the beginning of a test we are at time zero, so our TestSubscriber will not be completed:

在测试开始时,我们处于时间0,所以我们的TestSubscriber不会被完成。

subscriber.assertNoValues();
subscriber.assertNotCompleted();

To emulate time passing in our test we need to use a TestScheduler class. We can simulate that one-second pass by calling the advanceTimeBy() method on a TestScheduler:

为了在我们的测试中模拟时间传递,我们需要使用TestScheduler类。我们可以通过在TestScheduler上调用advanceTimeBy()方法来模拟一秒钟的传递。

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

The advanceTimeBy() method will make an observable produce one event. We can assert that one event was produced by calling an assertValueCount() method:

advanceTimeBy() 方法将使一个观察者产生一个事件。我们可以通过调用assertValueCount()方法来断定产生了一个事件。

subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");

Our list of letters has 5 elements in it so when we want to cause an observable to emit all events, 6 seconds of processing needs to pass. To emulate that 6 seconds, we use the advanceTimeTo() method:

我们的字母列表中有5个元素,所以当我们想让一个观察者发出所有事件时,需要经过6秒的处理。为了模拟这6秒,我们使用advanceTimeTo()方法。

scheduler.advanceTimeTo(6, TimeUnit.SECONDS);
 
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(5);
assertThat(subscriber.getOnNextEvents(), hasItems("0-A", "1-B", "2-C", "3-D", "4-E"));

After emulating passed time, we can execute assertions on a TestSubscriber. We can assert that all events were produced by calling the assertValueCount() method.

在模拟通过时间后,我们可以在TestSubscriber上执行断言。我们可以通过调用assertValueCount()方法来断言所有的事件都产生了。

6. Conclusion

6.结论

In this article, we examined ways of testing observers and observables in RxJava. We looked at a way of testing emitted events, errors and time-based observables.

在这篇文章中,我们研究了在RxJava中测试观察者和观察变量的方法。我们研究了测试发射的事件、错误和基于时间的观测器的方法。

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

所有这些例子和代码片段的实现都可以在GitHub项目中找到–这是一个Maven项目,所以应该很容易导入并按原样运行。