1. Overview
1.概述
In this tutorial, we’re going to learn how to transform traditional synchronous and asynchronous APIs into Observables using RxJava2 operators.
在本教程中,我们将学习如何使用RxJava2操作符将传统的同步和异步API转化为Observables。
We’ll create a few simple functions that will help us discuss these operators in detail.
我们将创建几个简单的函数,以帮助我们详细讨论这些运算符。
2. Maven Dependencies
2.Maven的依赖性
First, we have to add RxJava2 and RxJava2Extensions as Maven dependencies:
首先,我们必须将RxJava2和RxJava2Extensions作为Maven依赖项。
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>com.github.akarnokd</groupId>
<artifactId>rxjava2-extensions</artifactId>
<version>0.20.4</version>
</dependency>
3. The Operators
3.操作者
RxJava2 defines a whole lot of operators for various use cases of reactive programming.
RxJava2为反应式编程的各种使用情况定义了一大堆操作符。
But we’ll be discussing only a few operators that are commonly used for converting synchronous or asynchronous methods into Observables based on their nature. These operators take functions as arguments and emit the value returned from that function.
但我们将只讨论几个常用的操作符,这些操作符根据其性质将同步或异步方法转换为Observables。这些操作符以函数为参数,并发射从该函数返回的值。
Along with the normal operators, RxJava2 defines a few more operators for extended functionalities.
除了正常的运算符外,RxJava2还定义了一些扩展功能的运算符。
Let’s explore how we can make use of these operators to convert synchronous and asynchronous methods.
让我们来探讨一下我们如何利用这些操作符来转换同步和异步方法。
4. Synchronous Method Conversion
4.同步方法转换
4.1. Using fromCallable()
4.1.使用fromCallable()
This operator returns an Observable that, when a subscriber subscribes to it, invokes the function passed as the argument and then emits the value returned from that function. Let’s create a function that returns an integer and transform it:
这个操作符返回一个Observable,当订阅者订阅它时,调用作为参数传递的函数,然后发射从该函数返回的值。让我们创建一个返回整数的函数并对其进行转换。
AtomicInteger counter = new AtomicInteger();
Callable<Integer> callable = () -> counter.incrementAndGet();
Now, let’s transform it into an Observable and test it by subscribing to it:
现在,让我们把它转化为一个Observable,并通过订阅它来测试它。
Observable<Integer> source = Observable.fromCallable(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
The fromCallable() operator executes the specified function lazily each time when the wrapped Observable gets subscribed. To test this behavior, we’ve created multiple subscribers using a loop.
当被包装的Observable被订阅时,fromCallable()操作符每次都会懒散地执行指定的函数。为了测试这种行为,我们使用一个循环创建了多个订阅者。
Since reactive streams are asynchronous by default, the subscriber will return immediately. In most of the practical scenarios, the callable function will have some kind of delay to complete its execution. So, we’ve added a maximum wait time of five seconds before testing the result of our callable function.
由于反应式流在默认情况下是异步的,订阅者将立即返回。在大多数实际场景中,可调用函数会有某种延迟来完成其执行。所以,我们在测试可调用函数的结果之前,增加了5秒的最大等待时间。
Note also that we’ve used Observable‘s test() method. This method is handy when testing Observables. It creates a TestObserver and subscribes to our Observable.
还请注意,我们使用了Observable的test()方法。在测试Observables时,这个方法很方便。它创建了一个TestObserver并订阅了我们的Observable。TestObserver并订阅我们的Observable。
4.2. Using start()
4.2.使用start()
The start() operator is part of the RxJava2Extension module. It will invoke the specified function asynchronously and returns an Observable that emits the result:
start()操作符是RxJava2Extension模块的一部分。它将异步地调用指定的函数,并返回一个发出结果的Observable。
Observable<Integer> source = AsyncObservable.start(callable);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
The function is called immediately, not whenever a subscriber subscribes to the resulting Observable. Multiple subscriptions to this observable observe the same return value.
该函数被立即调用,而不是每当订阅者订阅了所产生的Observable时就被调用。对该观测器的多个订阅者观察到相同的返回值。
5. Asynchronous Method Conversion
5.异步方法转换
5.1. Using fromFuture()
5.1.使用fromFuture()
As we know, the most common way of creating an asynchronous method in Java is using the Future implementation. The fromFuture method takes a Future as its argument and emits the value obtained from the Future.get() method.
正如我们所知,在Java中创建异步方法的最常见方式是使用Future实现。fromFuture方法将一个Future作为其参数,并将从Future.get()方法获得的值发射出去。
First, let’s make the function which we’ve created earlier asynchronous:
首先,让我们把之前创建的函数变成异步的。
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(callable);
Next, let’s do the testing by transforming it:
接下来,让我们通过转换来做测试。
Observable<Integer> source = Observable.fromFuture(future);
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
assertEquals(1, counter.get());
}
executor.shutdown();
And notice once again that each subscription observes the same return value.
并再次注意到,每个订阅都观察到了相同的返回值。
Now, the dispose() method of Observable is really helpful when it comes to memory leak prevention. But in this case, it will not cancel the future because of the blocking nature of Future.get().
现在,Observable的dispose()方法在防止内存泄漏方面确实很有帮助。但是在这种情况下,由于Future.get()的阻塞性,它将不会取消未来。
So, we can make sure to cancel the future by combining the doOnDispose() function of the source observable and the cancel method on future:
因此,我们可以通过结合source观察器的doOnDispose()函数和future上的cancel方法来确保取消未来。
source.doOnDispose(() -> future.cancel(true));
5.2. Using startFuture()
5.2.使用startFuture()
As the name depicts, this operator will start the specified Future immediately and emits the return value when a subscriber subscribes it. Unlike the fromFuture operator which caches the result for next use, this operator will execute the asynchronous method each time when it gets subscribed:
正如名字所描述的那样,该操作符将立即启动指定的Future,并在订阅者订阅它时发出返回值。与fromFuture操作符不同的是,该操作符将在每次被订阅时执行异步方法,以备下次使用。
ExecutorService executor = Executors.newSingleThreadExecutor();
Observable<Integer> source = AsyncObservable.startFuture(() -> executor.submit(callable));
for (int i = 1; i < 5; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(i);
assertEquals(i, counter.get());
}
executor.shutdown();
5.3. Using deferFuture()
5.3.使用deferFuture()
This operator aggregates multiple Observables returned from a Future method and returns a stream of return values obtained from each Observable. This will start the passed asynchronous factory function whenever a new subscriber subscribes.
该操作符聚合了从Future方法返回的多个Observables,并返回从每个Observable获得的返回值流。只要有新的订阅者订阅,这将启动传递的异步工厂函数。
So let’s first create the asynchronous factory function:
所以我们首先创建异步工厂函数。
List<Integer> list = Arrays.asList(new Integer[] { counter.incrementAndGet(),
counter.incrementAndGet(), counter.incrementAndGet() });
ExecutorService exec = Executors.newSingleThreadExecutor();
Callable<Observable<Integer>> callable = () -> Observable.fromIterable(list);
And then we can do a quick test:
然后我们可以做一个快速测试。
Observable<Integer> source = AsyncObservable.deferFuture(() -> exec.submit(callable));
for (int i = 1; i < 4; i++) {
source.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1,2,3);
}
exec.shutdown();
6. Conclusion
6.结论
In this tutorial, we’ve learned how to transform synchronous and the asynchronous methods to RxJava2 observables.
在本教程中,我们已经学会了如何将同步方法和异步方法转化为RxJava2的观察变量。
Of course, the examples we’ve shown here are the basic implementations. But we can use RxJava2 for more complex applications like video streaming and applications where we need to send large amounts of data in portions.
当然,我们在这里展示的例子是基本实现。但我们可以将RxJava2用于更复杂的应用,如视频流和需要分批发送大量数据的应用。
As usual, all the short examples we’ve discussed here can be found over on the Github project.
像往常一样,我们在这里讨论的所有简短例子都可以在Github项目上找到。