Dealing with Backpressure with RxJava – 用RxJava处理背压问题

最后修改: 2017年 2月 6日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we will look at the way the RxJava library helps us to handle backpressure.

在这篇文章中,我们将看看RxJava库如何帮助我们处理背压。

Simply put – RxJava utilizes a concept of reactive streams by introducing Observables, to which one or many Observers can subscribe to. Dealing with possibly infinite streams is very challenging, as we need to face a problem of a backpressure.

简单地说,RxJava通过引入Observables,一个或多个Observers可以订阅的反应流概念。处理可能是无限的流是非常具有挑战性的,因为我们需要面对一个背压的问题。

It’s not difficult to get into a situation in which an Observable is emitting items more rapidly than a subscriber can consume them. We will look at the different solutions to the problem of growing buffer of unconsumed items.

不难发现,Observable发出项目的速度超过了订阅者的消费速度。我们将研究解决未消耗项目的缓冲区不断增长问题的不同方案。

2. Hot Observables Versus Cold Observables

2.热的观测点与冷的观测点

First, let’s create a simple consumer function that will be used as a consumer of elements from Observables that we will define later:

首先,让我们创建一个简单的消费者函数,它将被用作来自Observables的元素的消费者,我们将在后面定义。

public class ComputeFunction {
    public static void compute(Integer v) {
        try {
            System.out.println("compute integer v: " + v);
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Our compute() function is simply printing the argument. The important thing to notice here is an invocation of a Thread.sleep(1000) method – we are doing it to emulate some long running task that will cause Observable to fill up with items quicker that Observer can consume them.

我们的compute() 函数只是在打印参数。这里需要注意的是对Thread.sleep(1000)方法的调用–我们这样做是为了模拟一些长期运行的任务,这将导致ObservableObserver能消耗的速度填满项目。

We have two types of the Observables – Hot and Cold – that are totally different when it comes to a backpressure handling.

我们有两种类型的Observables–热–在涉及背压处理时完全不同。

2.1. Cold Observables

2.1.寒冷的观测物

A cold Observable emits a particular sequence of items but can begin emitting this sequence when its Observer finds it to be convenient, and at whatever rate the Observer desires, without disrupting the integrity of the sequence. Cold Observable is providing items in a lazy way.

冷的 Observable 发出一个特定的项目序列,但是当它的 Observer 认为方便时,它可以开始发出这个序列,并且以 Observer 希望的任何速度发出,而不会破坏序列的完整性。Cold Observable是以一种懒惰的方式提供项目。

The Observer is taking elements only when it is ready to process that item, and items do not need to be buffered in an Observable because they are requested in a pull fashion.

Observer只有在准备处理该项目时才会取用元素,项目不需要在Observable中缓冲,因为它们是以拉动的方式请求的。

For example, if you create an Observable based on a static range of elements from one to one million, that Observable would emit the same sequence of items no matter how frequently those items are observed:

例如,如果您根据从一到一百万的静态元素范围创建一个Observable,那么无论这些项目被观察的频率如何,该Observable都会发出相同的项目序列。

Observable.range(1, 1_000_000)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute);

When we start our program, items will be computed by Observer lazily and will be requested in a pull fashion. The Schedulers.computation() method means that we want to run our Observer within a computation thread pool in RxJava.

当我们启动程序时,项目将被Observer快速计算,并将以拉动的方式请求。Schedulers.computation()方法意味着我们要在RxJava的计算线程池中运行我们的Observer

The output of a program will consist of a result of a compute() method invoked for one by one item from an Observable:

程序的输出将由compute()方法的结果组成,该方法被逐一调用,用于处理Observable中的项目。

compute integer v: 1
compute integer v: 2
compute integer v: 3
compute integer v: 4
...

Cold Observables do not need to have any form of a backpressure because they work in a pull fashion. Examples of items emitted by a cold Observable might include the results of a database query, file retrieval, or web request.

观测器不需要有任何形式的背压,因为它们以拉动方式工作。由冷Observable发出的项目的例子可能包括数据库查询的结果、文件检索或 Web 请求。

2.2. Hot Observables

2.2.热点观测点

A hot Observable begins generating items and emits them immediately when they are created. It is contrary to a Cold Observables pull model of processing. Hot Observable emits items at its own pace, and it is up to its observers to keep up.

一个热的Observable开始生成项目,并在它们被创建时立即发射。它与冷Observablespull的处理模式相反。热的Observable以自己的速度发射项目,而由它的观察者来跟上。

When the Observer is not able to consume items as quickly as they are produced by an Observable they need to be buffered or handled in some other way, as they will fill up the memory, finally causing OutOfMemoryException.

Observer无法像Observable产生的项目那样快速消费时,它们需要被缓冲或以其他方式处理,因为它们会填满内存,最终导致OutOfMemoryException。

Let’s consider an example of hot Observable, that is producing a 1 million items to an end consumer that is processing those items. When a compute() method in the Observer takes some time to process every item, the Observable is starting to fill up a memory with items, causing a program to fail:

让我们考虑一个热的Observable,的例子,它正在生产100万个项目给一个正在处理这些项目的终端消费者。当Observer中的compute()方法需要一些时间来处理每个项目时,Observable就开始用项目填满内存,导致程序失败。

PublishSubject<Integer> source = PublishSubject.<Integer>create();

source.observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

IntStream.range(1, 1_000_000).forEach(source::onNext);

Running that program will fail with a MissingBackpressureException because we didn’t define a way of handling overproducing Observable.

运行该程序将以MissingBackpressureException而失败,因为我们没有定义处理过度生产Observable的方法。

Examples of items emitted by a hot Observable might include mouse & keyboard events, system events, or stock prices.

由热的Observable发出的项目的例子可能包括鼠标和键盘事件、系统事件或股票价格。

3. Buffering Overproducing Observable

3.缓冲区过度生产可观察

The first way to handle overproducing Observable is to define some kind of a buffer for elements that cannot be processed by an Observer.

处理过度生产Observable的第一个方法是为不能被Observer处理的元素定义某种缓冲。

We can do it by calling a buffer() method:

我们可以通过调用buffer()方法来做到这一点。

PublishSubject<Integer> source = PublishSubject.<Integer>create();
        
source.buffer(1024)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

Defining a buffer with a size of 1024 will give an Observer some time to catch up to an overproducing source. The buffer will store items that were not yet processed.

定义一个大小为1024的缓冲区将给Observer一些时间来追赶一个过度生产的源。该缓冲区将存储尚未处理的项目。

We can increase a buffer size to have enough room for produced values.

我们可以增加一个缓冲区的大小,以便有足够的空间容纳产生的值。

Note however that generally, this may be only a temporary fix as the overflow can still happen if the source overproduces the predicted buffer size.

但是请注意,一般来说,这可能只是一个临时的修复方法,因为如果源头过度产生预测的缓冲区大小,溢出仍然可能发生。

4. Batching Emitted Items

4.分批排放的项目

We can batch overproduced items in windows of N elements.

我们可以在N个元素的窗口中批量生产过剩的物品。

When Observable is producing elements quicker than Observer can process them, we can alleviate this by grouping produced elements together and sending a batch of elements to Observer that is able to process a collection of elements instead of element one by one:

Observable产生元素的速度快于Observer处理它们的速度时,我们可以通过将产生的元素分组并向Observer发送一批元素来缓解这种情况,该批元素能够处理元素集合而不是一个一个的元素。

PublishSubject<Integer> source = PublishSubject.<Integer>create();

source.window(500)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

Using window() method with argument 500, will tell Observable to group elements into the 500-sized batches. This technique can reduce a problem of overproducing Observable when Observer is able to process a batch of elements quicker comparing to processing elements one by one.

使用带有参数500的window() 方法,将告诉Observable 将元素分组为500大小的批次。当Observable能够比逐个处理元素更快地处理一批元素时,这种技术可以减少过度生产的问题。

5. Skipping Elements

5 跳过元素

If some of the values produced by Observable can be safely ignored, we can use the sampling within a specific time and throttling operators.

如果由Observable产生的一些值可以被安全地忽略,我们可以使用特定时间内的采样和节流操作。

The methods sample() and throttleFirst() are taking duration as a parameter:

方法sample()throttleFirst()将持续时间作为一个参数。

  • The sample() method periodically looks into the sequence of elements and emits the last item that was produced within the duration specified as a parameter
  • The throttleFirst() method emits the first item that was produced after the duration specified as a parameter

The duration is a time after which one specific element is picked from the sequence of produced elements. We can specify a strategy for handling backpressure by skipping elements:

持续时间是一个时间,之后从产生的元素序列中挑选一个特定的元素。我们可以指定一种策略,通过跳过元素来处理背压。

PublishSubject<Integer> source = PublishSubject.<Integer>create();

source.sample(100, TimeUnit.MILLISECONDS)
  .observeOn(Schedulers.computation())
  .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

We specified that strategy of skipping elements will be a sample() method. We want a sample of a sequence from 100 milliseconds duration. That element will be emitted to the Observer.

我们指定跳过元素的策略将是一个sample()method。我们想要一个持续时间为100毫秒的序列样本。该元素将被发射到Observer.

Remember, however, that these operators only reduce the rate of value reception by the downstream Observer and thus they may still lead to MissingBackpressureException.

然而,请记住,这些操作符只是降低了下游Observer的价值接收率,因此它们仍然可能导致MissingBackpressureException

6. Handling a Filling Observable Buffer

6.处理充满Observable的缓冲区

In case that our strategies of sampling or batching elements do not help with filling up a buffer, we need to implement a strategy of handling cases when a buffer is filling up.

如果我们的采样或批处理元素的策略对填满缓冲区没有帮助我们需要实施一种策略来处理缓冲区被填满的情况。

We need to use an onBackpressureBuffer() method to prevent BufferOverflowException.

我们需要使用一个onBackpressureBuffer()方法来防止BufferOverflowException.

The onBackpressureBuffer() method takes three arguments: a capacity of an Observable buffer, a method that is invoked when a buffer is filling up, and a strategy for handling elements that need to be discarded from a buffer. Strategies for overflow are in a BackpressureOverflow class.

onBackpressureBuffer()方法需要三个参数:一个Observable缓冲区的容量,一个在缓冲区充满时被调用的方法,以及一个处理需要从缓冲区丢弃的元素的策略。溢出的策略在一个BackpressureOverflow类中。

There are 4 types of actions that can be executed when the buffer fills up:

有4种类型的动作可以在缓冲区填满时执行。

  • ON_OVERFLOW_ERROR – this is the default behavior signaling a BufferOverflowException when the buffer is full
  • ON_OVERFLOW_DEFAULT – currently it is the same as ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST – if an overflow would happen, the current value will be simply ignored and only the old values will be delivered once the downstream Observer requests
  • ON_OVERFLOW_DROP_OLDEST – drops the oldest element in the buffer and adds the current value to it

Let’s see how to specify that strategy:

让我们看看如何指定这一策略。

Observable.range(1, 1_000_000)
  .onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST)
  .observeOn(Schedulers.computation())
  .subscribe(e -> {}, Throwable::printStackTrace);

Here our strategy for handling the overflowing buffer is dropping the oldest element in a buffer and adding newest item produced by an Observable.

在这里,我们处理溢出的缓冲区的策略是丢弃缓冲区中最古老的元素,并添加由Observable产生的最新项目。

Note that the last two strategies cause a discontinuity in the stream as they drop out elements. In addition, they won’t signal BufferOverflowException.

请注意,后两种策略会在流中造成不连续,因为它们会丢弃元素。此外,它们不会发出BufferOverflowException信号。

7. Dropping All Overproduced Elements

7.放弃所有过度制作的元素

Whenever the downstream Observer is not ready to receive an element, we can use an onBackpressureDrop() method to drop that element from the sequence.

每当下游的Observer不准备接收一个元素时,我们可以使用onBackpressureDrop() 方法将该元素从序列中删除。

We can think of that method as an onBackpressureBuffer() method with a capacity of a buffer set to zero with a strategy ON_OVERFLOW_DROP_LATEST.

我们可以把这个方法看成是一个onBackpressureBuffer()方法,缓冲区的容量被设置为零,策略是ON_OVERFLOW_DROP_LATEST。

This operator is useful when we can safely ignore values from a source Observable (such as mouse moves or current GPS location signals) as there will be more up-to-date values later on:

当我们可以安全地忽略来自源Observable的值(如鼠标移动或当前GPS位置信号)时,这个操作符很有用,因为以后会有更多最新的值。

Observable.range(1, 1_000_000)
  .onBackpressureDrop()
  .observeOn(Schedulers.computation())
  .doOnNext(ComputeFunction::compute)
  .subscribe(v -> {}, Throwable::printStackTrace);

The method onBackpressureDrop() is eliminating a problem of overproducing Observable but needs to be used with caution.

方法onBackpressureDrop()可以消除过度产生Observable的问题,但需要谨慎使用。

8. Conclusion

8.结论

In this article, we looked at a problem of overproducing Observable and ways of dealing with a backpressure. We looked at strategies of buffering, batching and skipping elements when the Observer is not able to consume elements as quickly as they are produced by an Observable.

在这篇文章中,我们探讨了过度生产Observable的问题以及处理反压的方法。我们研究了当Observer无法像Observable产生的元素那样快速消费元素时的缓冲、分批和跳过元素的策略。

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

所有这些例子和代码片段的实现都可以在GitHub项目中找到–这是一个Maven项目,所以应该很容易导入并按原样运行。