1. Overview
1.概述
In this article, we’ll discover some utility operators for working with Observables in RxJava and how to implement custom ones.
在这篇文章中,我们将发现一些用于在RxJava中处理Observables的实用操作符,以及如何实现自定义操作符。
An operator is a function that takes and alters the behavior of an upstream Observable<T> and returns a downstream Observable<R> or Subscriber, where types T and R might or might not be the same.
操作者是一个函数,它接收并改变上游的Observable<T>的行为,并返回下游的Observable<R>或Subscriber,其中T和R的类型可能相同也可能不同。
Operators wrap existing Observables and enhance them typically by intercepting subscription. This might sound complicated, but it is actually quite flexible and not that difficult to grasp.
操作符包裹着现有的Observables,并且通常通过拦截订阅来增强它们。这听起来可能很复杂,但实际上相当灵活,并不难掌握。
2. Do
2.做
There are multiple actions that could alter the Observable lifecycle events.
有多种行动可以改变Observable生命周期事件。
The doOnNext operator modifies the Observable source so that it invokes an action when the onNext is called.
doOnNext操作符修改了Observablesource,以便当onNext被调用时调用一个动作。。
The doOnCompleted operator registers an action which is called if the resulting Observable terminates normally, calling Observer‘s onCompleted method:
doOnCompleted操作符注册了一个动作,如果产生的Observable正常终止,就会调用Observer的onCompleted方法:。
Observable.range(1, 10)
.doOnNext(r -> receivedTotal += r)
.doOnCompleted(() -> result = "Completed")
.subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Completed"));
The doOnEach operator modifies the Observable source so that it notifies an Observer for each item and establishes a callback that will be called each time an item is emitted.
doOnEach操作符修改了Observable源,以便它为每个项目通知一个Observer,并建立一个回调,该回调将在每次项目被发出时被调用。
The doOnSubscribe operator registers an action which is called whenever an Observer subscribes to the resulting Observable.
doOnSubscribe操作符注册了一个动作,每当Observer订阅到产生的Observable时,该动作就会被调用。
There’s also the doOnUnsubscribe operator which does the opposite of doOnSubscribe:
还有一个doOnUnsubscribe操作符,它的作用与doOnSubscribe相反:。
Observable.range(1, 10)
.doOnEach(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Complete");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer value) {
receivedTotal += value;
}
})
.doOnSubscribe(() -> result = "Subscribed")
.subscribe();
assertTrue(receivedTotal == 55);
assertTrue(result.equals("Subscribed"));
When an Observable completes with an error, we can use the doOnError operator to perform an action.
当Observable以错误完成时,我们可以使用doOnError操作器来执行一个动作。
DoOnTerminate operator registers an action that will be invoked when an Observable completes, either successfully or with an error:
DoOnTerminate操作者注册了一个动作,当一个Observable成功或出错完成时,这个动作将被调用。
thrown.expect(OnErrorNotImplementedException.class);
Observable.empty()
.single()
.doOnError(throwable -> { throw new RuntimeException("error");})
.doOnTerminate(() -> result += "doOnTerminate")
.doAfterTerminate(() -> result += "_doAfterTerminate")
.subscribe();
assertTrue(result.equals("doOnTerminate_doAfterTerminate"));
There’s also a FinallyDo operator – which was deprecated in favor of doAfterTerminate. It registers an action when an Observable completes.
还有一个FinallyDo操作符–它已被弃用,转而使用doAfterTerminate。当Observable完成时,它注册了一个动作。
3. ObserveOn vs SubscribeOn
3.ObserveOn与SubscribeOn
By default, an Observable along with the operator chain will operate on the same thread on which its Subscribe method is called.
默认情况下,Observable和操作者链将在其Subscribe方法被调用的同一线程上操作。
The ObserveOn operator specifies a different Scheduler that the Observable will use for sending notifications to Observers:
ObserveOn operator指定了一个不同的Scheduler,Observable将用于向Observers发送通知。
Observable.range(1, 5)
.map(i -> i * 100)
.doOnNext(i -> {
emittedTotal += i;
System.out.println("Emitting " + i
+ " on thread " + Thread.currentThread().getName());
})
.observeOn(Schedulers.computation())
.map(i -> i * 10)
.subscribe(i -> {
receivedTotal += i;
System.out.println("Received " + i + " on thread "
+ Thread.currentThread().getName());
});
Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);
We see that elements were produced in the main thread and were pushed all the way to the first map call.
我们看到,元素是在主线程中产生的,并被一直推送到第一个map调用。
But after that, the observeOn redirected the processing to a computation thread, which was used when processing map and the final Subscriber.
但在那之后,observeOn将处理重定向到computation thread,在处理map和最后的Subscriber.时使用。
One problem that may arise with observeOn is the bottom stream can produce emissions faster than the top stream can process them. This can cause issues with backpressure that we may have to consider.
observeOn可能出现的一个问题是,底层流产生排放的速度比上层流处理的速度快。这可能导致背压的问题,我们可能不得不考虑。
To specify on which Scheduler the Observable should operate, we can use the subscribeOn operator:
为了指定Scheduler的Observable应该在哪个上面操作,我们可以使用subscribeOn操作符。
Observable.range(1, 5)
.map(i -> i * 100)
.doOnNext(i -> {
emittedTotal += i;
System.out.println("Emitting " + i
+ " on thread " + Thread.currentThread().getName());
})
.subscribeOn(Schedulers.computation())
.map(i -> i * 10)
.subscribe(i -> {
receivedTotal += i;
System.out.println("Received " + i + " on thread "
+ Thread.currentThread().getName());
});
Thread.sleep(2000);
assertTrue(emittedTotal == 1500);
assertTrue(receivedTotal == 15000);
SubscribeOn instructs the source Observable which thread to use for emitting items – only this thread will push items to the Subscriber. It can be placed in any place in the stream because it affects the subscription only.
SubscribeOn指示源Observable使用哪一个线程来发射项目–只有这个线程会将项目推送到Subscriber。它可以被放置在流中的任何地方,因为它只影响订阅。
Effectively, we can only use one subscribeOn, but we can have any number of observeOn operators. We can switch emissions from one thread to another with ease by using observeOn.
实际上,我们只能使用一个subscribeOn,但我们可以有任意数量的observeOn操作符。我们可以通过使用observeOn.,轻松地将排放从一个线程切换到另一个线程。
4. Single and SingleOrDefault
4、Single和SingleOrDefault
The operator Single returns an Observable that emits the single item emitted by the source Observable:
操作符Single返回一个Observable,该操作符发射源Observable所发射的单一项目:。
Observable.range(1, 1)
.single()
.subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 1);
If the source Observable produces zero or more than one element, an exception will be thrown:
如果源Observable产生0个或1个以上的元素,将抛出一个异常。
Observable.empty()
.single()
.onErrorReturn(e -> receivedTotal += 10)
.subscribe();
assertTrue(receivedTotal == 10);
On the other hand, the operator SingleOrDefault is very similar to Single, meaning that it also returns an Observable that emits the single item from the source, but additionally, we can specify a default value:
另一方面,操作符SingleOrDefault与Single非常相似,意思是它也会返回一个Observable,从源头发出单一项目,但另外,我们可以指定一个默认值。
Observable.empty()
.singleOrDefault("Default")
.subscribe(i -> result +=i);
assertTrue(result.equals("Default"));
But if the Observable source emits more than one item, it still throws an IllegalArgumentExeption:
但是,如果Observable源发出了一个以上的项目,它仍然会抛出一个IllegalArgumentExeption:。
Observable.range(1, 3)
.singleOrDefault(5)
.onErrorReturn(e -> receivedTotal += 10)
.subscribe();
assertTrue(receivedTotal == 10);
S simple conclusion:
S简单的结论。
- If it is expected that the source Observable may have none or one element, then SingleOrDefault should be used
- If we’re dealing with potentially more than one item emitted in our Observable and we only want to emit either the first or the last value, we can use other operators like first or last
5. Timestamp
5.时间戳
The Timestamp operator attaches a timestamp to each item emitted by the source Observable before reemitting that item in its own sequence. The timestamp indicates at what time the item was emitted:
Timestampoperator为源Observable发出的每个项目附加一个时间戳,然后再在自己的序列中重新发出该项目。该时间戳表明该项目是在什么时候发出的。
Observable.range(1, 10)
.timestamp()
.map(o -> result = o.getClass().toString() )
.last()
.subscribe();
assertTrue(result.equals("class rx.schedulers.Timestamped"));
6. Delay
6.延迟
This operator modifies its source Observable by pausing for a particular increment of time before emitting each of the source Observable’s items.
该操作符修改其源Observable,在发出源Observable的项之前,暂停一个特定的时间增量。
It offsets the entire sequence using the provided value:
它使用提供的值对整个序列进行偏移。
Observable source = Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.timestamp();
Observable delayedObservable
= source.delay(2, TimeUnit.SECONDS);
source.subscribe(
value -> System.out.println("source :" + value),
t -> System.out.println("source error"),
() -> System.out.println("source completed"));
delayedObservable.subscribe(
value -> System.out.println("delay : " + value),
t -> System.out.println("delay error"),
() -> System.out.println("delay completed"));
Thread.sleep(8000);
There is an alternative operator, with which we can delay the subscription to the source Observable called delaySubscription.
有一个替代的操作者,我们可以用它来延迟对源Observable的订阅,叫做delaySubscription。
The Delay operator runs on the computation Scheduler by default, but we can choose a different Scheduler by passing it in as an optional third parameter to delaySubscription.
延迟操作符在计算调度器上默认运行,但是我们可以选择不同的调度器,将其作为一个可选的第三个参数传递给delaySubscription。
7. Repeat
7.重复
Repeat simply intercepts completion notification from upstream and rather than passing it downstream it resubscribes.
Repeat只是从上游拦截完成通知,而不是将其传递给下游,而是重新订阅。
Therefore, it is not guaranteed that repeat will keep cycling through the same sequence of events, but it happens to be the case when upstream is a fixed stream:
因此,不能保证repeat会一直循环播放相同的事件序列,但当上游是一个固定的流时,恰好是这样。
Observable.range(1, 3)
.repeat(3)
.subscribe(i -> receivedTotal += i);
assertTrue(receivedTotal == 18);
8. Cache
8.缓存
The cache operator stands between the subscribe and our custom Observable.
cache操作器位于subscribe和我们的自定义Observable之间。
When the first subscriber appears, cache delegates subscription to the underlying Observable and forwards all notifications (events, completions, or errors) downstream.
当第一个订阅者出现时,cache将订阅委托给底层的Observable,并将所有通知(事件、完成或错误)转发给下游。
However, at the same time, it keeps a copy of all notifications internally. When a subsequent subscriber wants to receive pushed notifications, cache no longer delegates to the underlying Observable but instead feeds cached values:
然而,与此同时,它在内部保留了一份所有通知的副本。当随后的订阅者想要接收推送的通知时,cache不再委托给底层的Observable,而是提供缓存的值。
Observable<Integer> source =
Observable.<Integer>create(subscriber -> {
System.out.println("Create");
subscriber.onNext(receivedTotal += 5);
subscriber.onCompleted();
}).cache();
source.subscribe(i -> {
System.out.println("element 1");
receivedTotal += 1;
});
source.subscribe(i -> {
System.out.println("element 2");
receivedTotal += 2;
});
assertTrue(receivedTotal == 8);
9. Using
9.使用
When an observer subscribes to the Observable returned from the using(), it’ll use the Observable factory function to create the Observable the observer will… observe, while at the same time using the resource factory function to create whichever resource we’ve designed it to make.
当observer订阅从using()返回的Observable时,它将使用Observable工厂函数来创建Observable,observer将…观察,同时使用资源工厂函数来创建任何我们设计的资源。
When the observer unsubscribes from the Observable, or when the Observable terminates, using will call the third function to dispose of the created resource:
当observer从Observable退订时,或者当Observable终止时,using将调用第三个函数来处理创建的资源。
Observable<Character> values = Observable.using(
() -> "resource",
r -> {
return Observable.create(o -> {
for (Character c : r.toCharArray()) {
o.onNext(c);
}
o.onCompleted();
});
},
r -> System.out.println("Disposed: " + r)
);
values.subscribe(
v -> result += v,
e -> result += e
);
assertTrue(result.equals("resource"));
10. Conclusion
10.结论
In this article, we talked how to use RxJava utility operators and also how to explore their most important features.
在这篇文章中,我们谈到了如何使用RxJava实用操作符,以及如何探索其最重要的功能。
The true power of RxJava lies in its operators. Declarative transformations of streams of data are safe yet expressive and flexible.
RxJava的真正力量在于它的运算符。数据流的声明性转换是安全的,但却具有表达性和灵活性。
With a strong foundation in functional programming, operators play deciding role in RxJava adoption. Mastering built-in operators is a key to success in this library.
在函数式编程的坚实基础上,运算符在RxJava的采用中起着决定性作用。掌握内置操作符是这个库成功的关键。
The full source code for the project including all the code samples used here can be found over on GitHub.
该项目的全部源代码,包括这里使用的所有代码样本,可以在GitHub上找到over。