RxJava 2 – Flowable – RxJava 2 – Flowable

最后修改: 2018年 2月 23日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

RxJava is a Reactive Extensions Java implementation that allows us to write event-driven, and asynchronous applications. More information on how to use RxJava can be found in our intro article here.

RxJava是一个Reactive Extensions Java实现,它允许我们编写事件驱动和异步应用程序。关于如何使用RxJava的更多信息,可以在我们的intro文章中找到

RxJava 2 was rewritten from scratch, which brought multiple new features; some of which were created as a response for issues that existed in the previous version of the framework.

RxJava 2从头开始重写,带来了多个新功能;其中一些功能是为了应对前一版本框架中存在的问题而创建的。

One of such features is the io.reactivex.Flowable.

其中一个特性是io.reactivex.Flowable

2. Observable vs. Flowable

2.可观察vs 可流动的

In the previous version of RxJava, there was only one base class for dealing with backpressure-aware and non-backpressure-aware sources – Observable.

在以前的RxJava版本中,只有一个基类用于处理背压感知和非背压感知的源–Observable.

RxJava 2 introduced a clear distinction between these two kinds of sources – backpressure-aware sources are now represented using a dedicated class – Flowable.

RxJava 2对这两种来源进行了明确的区分–背压感知来源现在用一个专门的类来表示–Flowable.

Observable sources don’t support backpressure. Because of that, we should use it for sources that we merely consume and can’t influence.

可观察源不支持背压。正因为如此,我们应该将其用于我们只是消费而无法影响的源。

Also, if we’re dealing with a big number of elements, two possible scenarios connected with backpressure can occur depending on the type of the Observable.

另外,如果我们要处理大量的元素,根据Observable的类型,可能会出现两种与backpressure相关的情况。

In case of using a so-called cold Observable“, events are emitted lazily, so we’re safe from overflowing an observer.

在使用所谓的Observable“的情况下,事件被懒洋洋地发射出来,所以我们可以安全地避免观测器溢出。

When using a hot Observablehowever, this will continue to emit events, even if the consumer can’t keep up.

当使用一个热的Observable然而,这将继续发出事件,即使消费者无法跟上。

3. Creating a Flowable

3.创建一个Flowable

There are different ways to create a Flowable. Conveniently for us, those methods look similar to the methods in Observable in the first version of RxJava.

有不同的方法来创建一个Flowable。对我们来说很方便,这些方法看起来与RxJava第一个版本中Observable中的方法类似。

3.1. Simple Flowable

3.1.简单的可流动

We can create a Flowable using the just() method similarly as we could with Observable :

我们可以使用just()方法创建一个Flowable,就像我们可以使用Observable:一样。

Flowable<Integer> integerFlowable = Flowable.just(1, 2, 3, 4);

Even though using the just() is quite simple, it isn’t very common to create a Flowable from static data, and it’s used for testing purposes.

尽管使用just()非常简单,但从静态数据创建Flowable并不常见,它被用于测试目的。

3.2. Flowable from Observable

3.2.Flowable from Observable

When we have an Observable we can easily transform it to Flowable using the toFlowable() method:

当我们有一个Observable时,我们可以使用toFlowable()方法轻松地将其转换为Flowable

Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable
  .toFlowable(BackpressureStrategy.BUFFER);

Notice that to be able to perform the conversion, we need to enrich the Observable with a BackpressureStrategy. We’ll describe available strategies in the next section.

请注意,为了能够执行转换,我们需要用BackpressureStrategy来丰富Observable我们将在下一节中描述可用的策略。

3.3. Flowable from FlowableOnSubscribe

3.3.Flowable来自FlowableOnSubscribe

RxJava 2 introduced a functional interface FlowableOnSubscribe, which represents a Flowable that starts emitting events after the consumer subscribes to it.

RxJava 2引入了一个功能接口FlowableOnSubscribe,它表示一个Flowable,在消费者订阅了它之后开始发射事件。

Due to that, all clients will receive the same set of events, which makes FlowableOnSubscribe backpressure-safe.

由于这一点,所有的客户将收到相同的事件集,这使得FlowableOnSubscribe背压安全。

When we have the FlowableOnSubscribe we can use it to create the Flowable:

当我们有了FlowableOnSubscribe,我们就可以用它来创建Flowable

FlowableOnSubscribe<Integer> flowableOnSubscribe
 = flowable -> flowable.onNext(1);
Flowable<Integer> integerFlowable = Flowable
  .create(flowableOnSubscribe, BackpressureStrategy.BUFFER);

The documentation describes many more methods to create Flowable.

文档中描述了更多创建Flowable.的方法。

4. Flowable BackpressureStrategy

4.可流动的 背压策略

Some methods like toFlowable() or create() take a BackpressureStrategy as an argument.

一些方法,如toFlowable()create()需要一个BackpressureStrategy作为一个参数。

The BackpressureStrategy is an enumeration, which defines the backpressure behavior that we’ll apply to our Flowable.

BackpressureStrategy是一个枚举,它定义了我们将应用于Flowable的背压行为。

It can cache or drop events or not implement any behavior at all, in the last case, we will be responsible for defining it, using backpressure operators.

它可以缓存或丢弃事件,或者根本不实现任何行为,在最后一种情况下,我们将负责定义它,使用反压操作符。

BackpressureStrategy is similar to BackpressureMode present in the previous version of RxJava.

BackpressureStrategy类似于RxJava前一版本中的BackpressureMode

There are five different strategies available in RxJava 2.

在RxJava 2中,有五种不同的策略可用。

4.1. Buffer

4.1.缓冲区

If we use the BackpressureStrategy.BUFFER, the source will buffer all the events until the subscriber can consume them:

如果我们使用BackpressureStrategy.BUFFER源将缓冲所有事件,直到订阅者可以消费它们

public void thenAllValuesAreBufferedAndReceived() {
    List testList = IntStream.range(0, 100000)
      .boxed()
      .collect(Collectors.toList());
 
    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.BUFFER)
      .observeOn(Schedulers.computation()).test();

    testSubscriber.awaitTerminalEvent();

    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertEquals(testList, receivedInts);
}

It’s similar to invoking onBackpressureBuffer() method on Flowable, but it doesn’t allow to define a buffer size or the onOverflow action explicitly.

它类似于在Flowable上调用onBackpressureBuffer()方法,但它不允许明确定义缓冲区大小或onOverflow动作。

4.2. Drop

4.2.跌落

We can use the BackpressureStrategy.DROP to discard the events that cannot be consumed instead of buffering them.

我们可以使用BackpressureStrategy.DROP来丢弃不能被消耗的事件,而不是缓冲它们。

Again this is similar to using onBackpressureDrop() on Flowable:

同样,这与在Flowable上使用onBackpressureDrop()相似。

public void whenDropStrategyUsed_thenOnBackpressureDropped() {
   
    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.DROP)
      .observeOn(Schedulers.computation())
      .test();
    testSubscriber.awaitTerminalEvent();
    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertThat(receivedInts.size() < testList.size());
    assertThat(!receivedInts.contains(100000));
 }

4.3. Latest

4.3.最新的

Using the BackpressureStrategy.LATEST will force the source to keep only the latest events, thus overwriting any previous values if the consumer can’t keep up:

使用BackpressureStrategy.LATEST将强制源头只保留最新的事件,从而在消费者跟不上的情况下覆盖任何先前的值:

public void whenLatestStrategyUsed_thenTheLastElementReceived() {
  
    Observable observable = Observable.fromIterable(testList);
    TestSubscriber<Integer> testSubscriber = observable
      .toFlowable(BackpressureStrategy.LATEST)
      .observeOn(Schedulers.computation())
      .test();

    testSubscriber.awaitTerminalEvent();
    List<Integer> receivedInts = testSubscriber.getEvents()
      .get(0)
      .stream()
      .mapToInt(object -> (int) object)
      .boxed()
      .collect(Collectors.toList());

    assertThat(receivedInts.size() < testList.size());
    assertThat(receivedInts.contains(100000));
 }

BackpressureStrategy.LATEST and BackpressureStrategy.DROP look very similar when we look at the code.

BackpressureStrategy.LATEST和BackpressureStrategy.DROP在我们看代码时看起来非常相似。

However, BackpressureStrategy.LATEST will overwrite elements that our subscriber can’t handle and keep only the latest ones, hence the name.

然而,BackpressureStrategy.LATEST将覆盖我们的订阅者无法处理的元素,只保留最新的元素,因此被称为

BackpressureStrategy.DROP, on the other hand, will discard elements that can’t be handled. This means that newest elements won’t necessarily be emitted.

BackpressureStrategy.DROP,另一方面,将丢弃无法处理的元素。这意味着最新的元素不一定会被释放出来。

4.4. Error

4.4.错误

When we’re using the BackpressureStrategy.ERROR, we’re simply saying that we don’t expect backpressure to occur. Consequently, a MissingBackpressureException should be thrown if the consumer can’t keep up with the source:

当我们使用BackpressureStrategy.ERROR时,我们只是说我们不期望发生背压。因此,如果消费者无法跟上源的速度,就应该抛出一个MissingBackpressureException

public void whenErrorStrategyUsed_thenExceptionIsThrown() {
    Observable observable = Observable.range(1, 100000);
    TestSubscriber subscriber = observable
      .toFlowable(BackpressureStrategy.ERROR)
      .observeOn(Schedulers.computation())
      .test();

    subscriber.awaitTerminalEvent();
    subscriber.assertError(MissingBackpressureException.class);
}

4.5. Missing

4.5.缺少

If we use the BackpressureStrategy.MISSING, the source will push elements without discarding or buffering.

如果我们使用BackpressureStrategy.MISSING,源头将推送元素而不丢弃或缓冲。

The downstream will have to deal with overflows in this case:

下游将不得不处理这种情况下的溢出问题。

public void whenMissingStrategyUsed_thenException() {
    Observable observable = Observable.range(1, 100000);
    TestSubscriber subscriber = observable
      .toFlowable(BackpressureStrategy.MISSING)
      .observeOn(Schedulers.computation())
      .test();
    subscriber.awaitTerminalEvent();
    subscriber.assertError(MissingBackpressureException.class);
}

In our tests, we’re excepting MissingbackpressureException for both ERROR and MISSING strategies. As both of them will throw such exception when the source’s internal buffer is overflown.

在我们的测试中,我们将MissingbackpressureException排除在ERRORMISSING策略之外。因为它们都会在源的内部缓冲区溢出时抛出这样的异常。

However, it’s worth to note that both of them have a different purpose.

然而,值得注意的是,这两个人都有不同的目的。

We should use the former one when we don’t expect backpressure at all, and we want the source to throw an exception in case if it occurs.

当我们完全不期待背压时,我们应该使用前者,并且我们希望源码在发生背压时抛出一个异常。

The latter one could be used if we don’t want to specify a default behavior on the creation of the Flowable. And we’re going to use backpressure operators to define it later on.

如果我们不想在创建Flowable时指定一个默认行为,可以使用后一个。而我们稍后将使用背压操作符来定义它。

5. Summary

5.总结

In this tutorial, we’ve presented the new class introduced in RxJava 2 called Flowable.

在本教程中,我们介绍了RxJava 2中引入的名为Flowable.的新类。

To find more information about the Flowable itself and it’s API we can refer to the documentation.

要找到关于Flowable本身和它的API的更多信息,我们可以参考文档

As always all the code samples can be found over on GitHub.

像往常一样,所有的代码样本都可以在GitHub上找到over