Filtering Observables in RxJava – 在RxJava中过滤观察变量

最后修改: 2018年 4月 7日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

Following the Introduction to RxJava, we’re going to look at the filtering operators.

RxJava入门之后,我们要看一下过滤运算符。

In particular, we’re going to focus on filtering, skipping, time-filtering, and some more advanced filtering operations.

特别是,我们将专注于过滤、跳过、时间过滤和一些更高级的过滤操作。

2. Filtering

2.过滤

When working with Observable, sometimes it’s useful to select only a subset of emitted items. For this purpose, RxJava offers various filtering capabilities.

在使用Observable时,有时只选择发射项目的一个子集是很有用的。为此,RxJava提供了各种过滤功能

Let’s start looking at the filter method.

让我们开始看一下filter方法。

2.1. The filter Operator

2.1.过滤器操作符

Simply put, the filter operator filters an Observable making sure that emitted items match specified condition, which comes in the form of a Predicate.

简单地说,filter操作符对Observable进行过滤,确保发射的项目符合指定的条件,该条件以Predicate的形式存在。

Let’s see how we can filter only the odd values from those emitted:

让我们看看我们如何从发出的数值中只过滤出奇数的数值。

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .filter(i -> i % 2 != 0);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 3, 5, 7, 9);

2.2. The take Operator

2.2.take操作符

When filtering with take, the logic results in the emission of the first n items while ignoring the remaining items.

当用take进行过滤时,其逻辑结果是排放前n个项目,而忽略其余项目。

Let’s see how we can filter the sourceObservable and emit only the first two items:

让我们看看我们如何过滤sourceObservable并只发射前两个项目。

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.take(3);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3);

2.3. The takeWhile Operator

2.3.takeWhile操作符

When using takeWhile, the filtered Observable will keep emitting items until it encounters a first element that doesn’t match the Predicate.

当使用takeWhile时,被过滤的Observable将继续发射项目,直到它遇到不符合Predicate的第一个元素。

Let’s see how we can use the takeWhile – with a filtering Predicate:

让我们看看我们如何使用takeWhile–与过滤Predicate:

Observable<Integer> sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .takeWhile(i -> i < 4);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3);

2.4. The takeFirst Operator

2.4.takeFirst操作符

Whenever we want to emit only the first item matching a given condition, we can use takeFirst().

当我们想只发射符合给定条件的第一个项目时,我们可以使用takeFirst()。

Let’s have a quick look at how we can emit the first item that is greater than 5:

让我们快速看一下,我们如何能发出第一个大于5的项目。

Observable<Integer> sourceObservable = Observable
  .just(1, 2, 3, 4, 5, 7, 6);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .takeFirst(x -> x > 5);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(7);

2.5. first and firstOrDefault Operators

2.5.firstfirstOrDefault操作符

A similar behavior can be achieved using the first API:

使用first API可以实现类似的行为。

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.first();

filteredObservable.subscribe(subscriber);

subscriber.assertValue(1);

However, in case we want to specify a default value, if no items are emitted, we can use firstOrDefault:

然而,如果我们想指定一个默认值,如果没有项目被发射出来,我们可以使用firstOrDefault

Observable<Integer> sourceObservable = Observable.empty();

Observable<Integer> filteredObservable = sourceObservable.firstOrDefault(-1);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(-1);

2.6. The takeLast Operator

2.6.takeLast操作符

Next, if we want to emit only the last n items emitted by an Observable, we can use takeLast.

接下来,如果我们想只发射一个Observable所发射的最后n项,我们可以使用takeLast

Let’s see how it’s possible to emit only the last three items:

让我们看看如何可能只发出最后三项。

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.takeLast(3);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(8, 9, 10);

We have to remember that this delays the emission of any item from the source Observable until it completes.

我们必须记住,这将推迟源Observable的任何项目的排放,直到它完成。

2.7. last and lastOrDefault

2.7.lastlastOrDefault

If we want to emit only the last element, other then using takeLast(1), we can use last.

如果我们想只发射最后一个元素,除了使用takeLast(1),我们还可以使用last

This filters the Observable, emitting only the last element, which optionally verifies a filtering Predicate:

这将过滤Observable,只发出最后一个元素,它可以选择验证过滤的Predicate

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .last(i -> i % 2 != 0);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(9);

In case the Observable is empty, we can use lastOrDefault, that filters the Observable emitting the default value.

如果Observable是空的,我们可以使用lastOrDefault,它可以过滤Observable,发出默认值。

The default value is also emitted if the lastOrDefault operator is used and there aren’t any items that verify the filtering condition:

如果使用lastOrDefault操作符,并且没有任何项目可以验证过滤条件,那么也会发出默认值。

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = 
  sourceObservable.lastOrDefault(-1, i -> i > 10);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(-1);

2.8. elementAt and elementAtOrDefault Operators

2.8.elementAtelementAtOrDefault操作符

With the elementAt operator, we can pick a single item emitted by the source Observable, specifying its index:

通过elementAt操作符,我们可以挑选一个由源Observable发出的单项,指定其索引。

Observable<Integer> sourceObservable = Observable
  .just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.elementAt(4);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(7);

However, elementAt will throw an IndexOutOfBoundException if the specified index exceeds the number of items emitted.

然而,elementAt将抛出一个IndexOutOfBoundException,如果指定的索引超过了发射的项目数量。

To avoid this situation, it’s possible to use elementAtOrDefault – which will return a default value in case the index is out of range:

为了避免这种情况,可以使用elementAtOrDefault –,它将在索引超出范围时返回一个默认值。

Observable<Integer> sourceObservable = Observable
  .just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable
 = sourceObservable.elementAtOrDefault(7, -1);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(-1);

2.9. The ofType Operator

2.9.ofType操作符

Whenever the Observable emits Object items, it’s possible to filter them based on their type.

Observable发出Object项目时,可以根据它们的类型来过滤它们。

Let’s see how we can only filter the String type items emitted:

让我们看看我们如何只过滤发出的String类型项目。

Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11);
TestSubscriber subscriber = new TestSubscriber();

Observable filteredObservable = sourceObservable.ofType(String.class);

filteredObservable.subscribe(subscriber);

subscriber.assertValues("two", "five");

3. Skipping

3.跳楼

On the other hand, when we want to filter out or skip some of the items emitted by an Observable, RxJava offers a few operators as a counterpart of the filtering ones, that we’ve previously discussed.

另一方面,当我们想过滤掉或跳过由Observable发出的一些项目时,RxJava提供了一些操作符,作为我们之前讨论的过滤操作符的对应物

Let’s start looking at the skip operator, the counterpart of take.

让我们开始看看skip操作符,即take的对应部分。

3.1. The skip Operator

3.1.skip操作符

When an Observable emits a sequence of items, it’s possible to filter out or skip some of the firsts emitted items using skip.

当一个Observable发出一连串的项目时,可以使用skip来过滤掉或跳过一些最先发出的项目。

For example. let’s see how it’s possible to skip the first four elements:

例如,让我们看看如何能够跳过前四个元素。

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.skip(4);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(5, 6, 7, 8, 9, 10);

3.2. The skipWhile Operator

3.2.skipWhile操作符

Whenever we want to filter out all the first values emitted by an Observable that fail a filtering predicate, we can use the skipWhile operator:

当我们想过滤掉由Observable发出的、未能通过过滤谓词的所有第一个值时,我们可以使用skipWhile操作符:

Observable<Integer> sourceObservable = Observable
  .just(1, 2, 3, 4, 5, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .skipWhile(i -> i < 4);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(4, 5, 4, 3, 2, 1);

3.3. The skipLast Operator

3.3.skipLast操作符

The skipLast operator allows us to skip the final items emitted by the Observable accepting only those emitted before them.

skipLast操作符允许我们跳过Observable发出的最后项目,只接受之前发出的项目。

With this, we can, for example, skip the last five items:

有了这个,我们就可以,比如说,跳过最后五项。

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.skipLast(5);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3, 4, 5);

3.4. distinct and distinctUntilChanged Operators

3.4.distinctdistinctUntilChanged操作符

The distinct operator returns an Observable that emits all the items emitted by the sourceObservable that are distinct:

distinct操作符返回一个Observable,该操作符发出由sourceObservable发出的所有项目,这些项目是不同的:

Observable<Integer> sourceObservable = Observable
  .just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> distinctObservable = sourceObservable.distinct();

distinctObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3);

However, if we want to obtain an Observable that emits all the items emitted by the sourceObservable that are distinct from their immediate predecessor, we can use the distinctUntilChanged operator:

然而,如果我们想获得一个Observable,它发射由sourceObservable发射的所有项目,这些项目与它们的直接前身不同,我们可以使用distinctUntilChanged操作符。

Observable<Integer> sourceObservable = Observable
  .just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> distinctObservable = sourceObservable.distinctUntilChanged();

distinctObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 1, 3, 1);

3.5. The ignoreElements Operator

3.5.ignoreElements 操作符

Whenever we want to ignore all the elements emitted by the sourceObservable, we can simply use the ignoreElements:

每当我们想忽略由sourceObservable发出的所有元素时,我们可以简单地使用ignoreElements:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> ignoredObservable = sourceObservable.ignoreElements();

ignoredObservable.subscribe(subscriber);

subscriber.assertNoValues();

4. Time Filtering Operators

4.时间过滤操作符

When working with observable sequence, the time axis is unknown but sometimes getting timely data from a sequence could be useful.

在处理可观察的序列时,时间轴是未知的,但有时从序列中获得及时的数据可能是有用的。

With this purpose, RxJava offers a few methods that allow us to work with Observable using also the time axis.

为此,RxJava提供了一些方法,使我们能够使用时间轴来处理Observable

Before moving on to the first one, let’s define a timed Observable that will emit an item every second:

在继续讨论第一个问题之前,让我们定义一个定时的Observable,它将每秒钟发射一个项目。

TestScheduler testScheduler = new TestScheduler();

Observable<Integer> timedObservable = Observable
  .just(1, 2, 3, 4, 5, 6)
  .zipWith(Observable.interval(
    0, 1, TimeUnit.SECONDS, testScheduler), (item, time) -> item);

The TestScheduler is a special scheduler that allows advancing the clock manually at whatever pace we prefer.

TestScheduler是一个特殊的调度器,允许以我们喜欢的任何速度手动推进时钟

4.1. sample and throttleLast Operators

4.1.sample throttleLast操作符

The sample operator filters the timedObservable, returning an Observable that emits the most recent items emitted by this API within period time intervals.

sample操作符过滤timedObservable,返回一个Observable,该Observable发射该API在期间时间间隔内发射的最新项目。

Let’s see how we can sample the timedObservable, filtering only the last emitted item every 2.5 seconds:

让我们看看我们如何对timedObservable进行采样,每2.5秒只过滤最后发出的项目。

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> sampledObservable = timedObservable
  .sample(2500L, TimeUnit.MILLISECONDS, testScheduler);

sampledObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(3, 5, 6);

This kind of behavior can be achieved also using the throttleLast operator.

这种行为也可以用throttleLast操作符来实现。

4.2. The throttleFirst Operator

4.2.节流优先操作器

The throttleFirst operator differs from throttleLast/sample since it emits the first item emitted by the timedObservable in each sampling period instead of the most recently emitted one.

throttleFirst操作符与throttleLast/sample不同,因为它发射的是timedObservable在每个采样期间发射的第一个项目,而不是最近发射的项目。

Let’s see how we can emit the first items, using a sampling period of 4 seconds:

让我们看看我们如何使用4秒的采样周期来发射第一批项目。

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .throttleFirst(4100L, TimeUnit.SECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(1, 6);

4.3. debounce and throttleWithTimeout Operators

4.3.debouncethrottleWithTimeout操作器

With the debounce operator, it’s possible to emit only an item if a particular timespan has passed without emitting another item.

通过debounce操作符,有可能只发射一个项目,如果一个特定的时间段过去了而没有发射另一个项目。

Therefore, if we select a timespan that is greater than the time interval between the emitted items of the timedObservable, it will only emit the last one. On the other hand, if it’s smaller, it will emit all the items emitted by the timedObservable.

因此,如果我们选择的时间范围大于timedObservable的发射项目之间的时间间隔,它将只发射最后一个另一方面,如果它较小,它将发射由timedObservable发射的所有项目。

Let’s see what happens in the first scenario:

让我们看看在第一种情况下会发生什么。

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .debounce(2000L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValue(6);

This kind of behavior can also be achieved using throttleWithTimeout.

这种行为也可以用throttleWithTimeout实现。

4.4. The timeout Operator

4.4.timeout操作符

The timeout operator mirrors the source Observable, but issue a notification error, aborting the emission of items, if the source Observable fails to emit any items during a specified time interval.

timeout操作符反映了源Observable,但是如果源Observable在指定的时间间隔内未能发射任何项目,就会发出一个通知错误,中止项目的发射。

Let’s see what happens if we specify a timeout of 500 milliseconds to our timedObservable:

让我们看看如果我们给我们的timedObservable指定一个500毫秒的超时会发生什么。

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .timeout(500L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertError(TimeoutException.class); subscriber.assertValues(1);

5. Multiple Observable Filtering

5.多重观察过滤

When working with Observable, it’s definitely possible to decide if filtering or skipping items based on a second Observable.

在使用Observable时,肯定可以根据第二个Observable来决定是否过滤或跳过项目。

Before moving on, let’s define a delayedObservable, that will emit only 1 item after 3 seconds:

在继续之前,让我们定义一个delayedObservable,它将在3秒后只发出一个项目。

Observable<Integer> delayedObservable = Observable.just(1)
  .delay(3, TimeUnit.SECONDS, testScheduler);

Let’s start with takeUntil operator.

让我们从takeUntil操作器开始。

5.1. The takeUntil Operator

5.1.takeUntil操作符

The takeUntil operator discards any item emitted by the source Observable (timedObservable) after a second Observable (delayedObservable) emits an item or terminates:

takeUntil操作符在第二个ObservabletimedObservable)发出一个项目或终止后,丢弃源ObservabledelayedObservable)所发出的任何项目。

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .skipUntil(delayedObservable);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(4, 5, 6);

5.2. The skipUntil Operator

5.2.skipUntil操作符

On the other hand, skipUntil discards any item emitted by the source Observable (timedObservable) until a second Observable (delayedObservable) emits an item:

另一方面,skipUntil丢弃由源ObservabletimedObservable)发出的任何项目,直到第二个ObservabledelayedObservable)发出一个项目。

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .takeUntil(delayedObservable);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(1, 2, 3);

6. Conclusion

6.结论

In this extensive tutorial, we explored the different filtering operators available within RxJava, providing a simple example of each one.

在这个广泛的教程中,我们探讨了RxJava中可用的不同过滤操作符,为每个操作符提供了一个简单的例子。

As always, all the code examples in this article can be found over on GitHub.

一如既往,本文中的所有代码示例都可以在GitHub上找到over