Mathematical and Aggregate Operators in RxJava – RxJava中的数学和聚合运算符

最后修改: 2017年 10月 23日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

Following the introduction to RxJava article, we’re going to look at aggregate and mathematical operators.

RxJava介绍一文之后,我们要看一下聚合和数学运算符。

These operations must wait for the source Observable to emit all items. Because of this, these operators are dangerous to use on Observables that may represent very long or infinite sequences.

这些操作必须等待源Observable发出所有项目。正因为如此,这些操作在可能代表很长或无限序列的Observables上使用是危险的。

Secondly, all the examples use an instance of the TestSubscriber, a particular variety of Subscriber that can be used for unit testing, to perform assertions, inspect received events or wrap a mocked Subscriber.

其次,所有的例子都使用了TestSubscriber的实例,Subscriber的一个特殊品种,可用于单元测试,执行断言,检查接收的事件或包裹一个模拟的Subscriber。

Now, let’s start looking at the Mathematical operators.

现在,让我们开始看一下数学运算符。

2. Setup

2.设置

To use additional operators, we’ll need to add the additional dependency to the pom.xml:

要使用额外的运算符,我们需要将添加额外的依赖性pom.xml:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava-math</artifactId>
    <version>1.0.0</version>
</dependency>

Or, for a Gradle project:

或者,对于一个Gradle项目。

compile 'io.reactivex:rxjava-math:1.0.0'

3. Mathematical Operators

3.数学运算法则

The MathObservable is dedicated to performing mathematical operations and its operators use another Observable that emits items that can be evaluated as numbers.

MathObservable专门用于执行数学运算,它的运算符使用另一个Observable,它发出的项目可以被评估为数字。

3.1. Average

3.1.平均

The average operator emits a single value – the average of all values emitted by the source.

average操作符发出一个单一的值–由源发出的所有值的平均值。

Let’s see that in action:

让我们看看这一点的行动。

Observable<Integer> sourceObservable = Observable.range(1, 20);
TestSubscriber<Integer> subscriber = TestSubscriber.create();

MathObservable.averageInteger(sourceObservable).subscribe(subscriber);

subscriber.assertValue(10);

There are four similar operators for dealing with primitive values: averageInteger, averageLong, averageFloat, and averageDouble.

有四个类似的运算符用于处理原始值。averageInteger, averageLong, averageFloat, 和averageDouble。

3.2. Max

3.2. Max

The max operator emits the largest encountered number.

maxoperator发出的是遇到的最大数字。

Let’s see that in action:

让我们看看这一点的行动。

Observable<Integer> sourceObservable = Observable.range(1, 20);
TestSubscriber<Integer> subscriber = TestSubscriber.create();

MathObservable.max(sourceObservable).subscribe(subscriber);

subscriber.assertValue(9);

It’s important to note that the max operator has an overloaded method that takes a comparison function.

值得注意的是,max运算符有一个重载方法,它需要一个比较函数。

Considering the fact that the mathematical operators can also work on objects that can be managed as numbers, the max overloaded operator allows for comparing custom types or custom sorting of standard types.

考虑到数学运算符也可以在可以作为数字管理的对象上工作,max重载运算符允许比较自定义类型或对标准类型进行自定义排序。

Let’s define the Item class:

让我们来定义Item类。

class Item {
    private Integer id;

    // standard constructors, getter, and setter
}

We can now define the itemObservable and then use the max operator in order emit the Item with the highest id:

我们现在可以定义itemObservable,然后使用max操作符,以发射具有最高idItem

Item five = new Item(5);
List<Item> list = Arrays.asList(
  new Item(1), 
  new Item(2), 
  new Item(3), 
  new Item(4), 
  five);
Observable<Item> itemObservable = Observable.from(list);

TestSubscriber<Item> subscriber = TestSubscriber.create();

MathObservable.from(itemObservable)
  .max(Comparator.comparing(Item::getId))
  .subscribe(subscriber);

subscriber.assertValue(five);

3.3. Min

3.3.Min

The min operator emits a single item containing the smallest element from the source:

min操作符发出一个包含源文件中最小元素的单项。

Observable<Integer> sourceObservable = Observable.range(1, 20);
TestSubscriber<Integer> subscriber = TestSubscriber.create();

MathObservable.min(sourceObservable).subscribe(subscriber);

subscriber.assertValue(1);

The min operator has an overloaded method that accepts a comparator instance:

min操作符有一个重载方法,接受一个比较器实例。

Item one = new Item(1);
List<Item> list = Arrays.asList(
  one, 
  new Item(2), 
  new Item(3), 
  new Item(4), 
  new Item(5));
TestSubscriber<Item> subscriber = TestSubscriber.create();
Observable<Item> itemObservable = Observable.from(list);

MathObservable.from(itemObservable)
  .min(Comparator.comparing(Item::getId))
  .subscribe(subscriber);

subscriber.assertValue(one);

3.4. Sum

3.4. Sum

The sum operator emits a single value that represents the sum of all of the numbers emitted by the source Observable:

sum操作符发出一个单一的值,代表由源Observable>发出的所有数字的总和:

Observable<Integer> sourceObservable = Observable.range(1, 20);
TestSubscriber<Integer> subscriber = TestSubscriber.create();

MathObservable.sumInteger(sourceObservable).subscribe(subscriber);

subscriber.assertValue(210);

There are also primitive-specialized similar operators: sumInteger, sumLong, sumFloat, and sumDouble.

还有一些原始的专门化的类似运算符。sumInteger, sumLong, sumFloat, 和sumDouble。

4. Aggregate Operators

4.聚合运算器

4.1. Concat

4.1. Concat

The concat operator joins items emitted by the source together.

concat操作符将源发出的项目连接在一起.

Let’s now define two Observables and concatenate them:

现在让我们定义两个Observables并将它们连接起来。

List<Integer> listOne = Arrays.asList(1, 2, 3, 4);
Observable<Integer> observableOne = Observable.from(listOne);

List<Integer> listTwo = Arrays.asList(5, 6, 7, 8);
Observable<Integer> observableTwo = Observable.from(listTwo);

TestSubscriber<Integer> subscriber = TestSubscriber.create();

Observable<Integer> concatObservable = observableOne
  .concatWith(observableTwo);

concatObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3, 4, 5, 6, 7, 8);

Going into details, the concat operator waits with subscribing to each additional Observable that are passed to it until the previous one completes.

详细来说,concat操作者通过订阅传递给它的每个额外的Observable来等待,直到前一个操作完成。

For this reason, concatenating a “hot” Observable, that begins emitting items immediately, will lead to the loss of any items that the “hot” Observable emits before all previous ones are completed.

由于这个原因,连接一个立即开始发射项目的 “热”Observable,将导致 “热”Observable在所有先前的项目完成之前发射的任何项目的丢失。

4.2. Count

4.2.计数

The count operator emits the count of all items emitted by the source:

count操作符发出源头发出的所有项目的计数。

Let’s count the numbers of items emitted by an Observable:

让我们来计算一个Observable发出的项目的数量。

List<String> lettersList = Arrays.asList(
  "A", "B", "C", "D", "E", "F", "G");
TestSubscriber<Integer> subscriber = TestSubscriber.create();

Observable<Integer> sourceObservable = Observable
  .from(lettersList).count();
sourceObservable.subscribe(subscriber);

subscriber.assertValue(7);

If the source Observable terminates with an error, the count will pass a notification error without emitting an item. However, if it doesn’t terminate at all, the count will neither emit an item nor terminate.

如果源Observable出错终止,count将通过一个通知错误而不发出一个项目。然而,如果它根本没有终止,count将既不发射一个项目也不终止。

For the count operation, there is also the countLong operator, that in the end emits a Long value, for those sequences that may exceed the capacity of an Integer.

对于count操作,还有countLong操作符,它最终会发出一个Long值,用于那些可能超过Integer容量的序列。

4.3. Reduce

4.3. Reduce

The reduce operator reduces all emitted elements into a single element by applying the accumulator function.

reduce操作符通过应用累加器函数将所有发射的元素减少为一个元素。

This process continues until the all the items are emitted and then the Observable, from the reduce, emits the final value returned from the function.

这个过程一直持续到所有的项目都被发射出来,然后来自reduce的Observable,发射出从函数返回的最终值。

Now, let’s see how it’s possible to perform reduction of a list of String, concatenating them in the reverse order:

现在,让我们看看如何对String列表进行缩减,将它们按相反的顺序连接起来。

List<String> list = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
TestSubscriber<String> subscriber = TestSubscriber.create();

Observable<String> reduceObservable = Observable.from(list)
  .reduce((letter1, letter2) -> letter2 + letter1);
reduceObservable.subscribe(subscriber);

subscriber.assertValue("GFEDCBA");

4.4. Collect

4.4.收集

The collect operator is similar to the reduce operator, but it’s dedicated to collecting elements into a single mutable data structure.

collect操作符与reduce操作符相似,但它专门用于将元素收集到一个单一的可变数据结构中。

It requires two parameters:

它需要两个参数。

  • a function that returns the empty mutable data structure
  • a function that, when given the data structure and an emitted item, modifies the data structure appropriately

Let’s see how it can be possible to return a set of items from an Observable:

让我们看看如何能够从Observable返回一个set项。

List<String> list = Arrays.asList("A", "B", "C", "B", "B", "A", "D");
TestSubscriber<HashSet> subscriber = TestSubscriber.create();

Observable<HashSet<String>> reduceListObservable = Observable
  .from(list)
  .collect(HashSet::new, HashSet::add);
reduceListObservable.subscribe(subscriber);

subscriber.assertValues(new HashSet(list));

4.5. ToList

4.5.ToList

The toList operator works just like the collect operation, but collects all elements into a single list – think about Collectors.toList() from the Stream API:
Observable<Integer> sourceObservable = Observable.range(1, 5);
TestSubscriber<List> subscriber = TestSubscriber.create();

Observable<List<Integer>> listObservable = sourceObservable
  .toList();
listObservable.subscribe(subscriber);

subscriber.assertValue(Arrays.asList(1, 2, 3, 4, 5));

4.6. ToSortedList

4.6.ToSortedList

Just like in the previous example but the emitted list is sorted:

就像前面的例子一样,但发出的列表是排序的。

Observable<Integer> sourceObservable = Observable.range(10, 5);
TestSubscriber<List> subscriber = TestSubscriber.create();

Observable<List<Integer>> listObservable = sourceObservable
  .toSortedList();
listObservable.subscribe(subscriber);

subscriber.assertValue(Arrays.asList(10, 11, 12, 13, 14));

As we can see, the toSortedList uses the default comparison, but it’s possible to provide a custom comparator function. We can now see how it’s possible to sort the integers in a reverse order using a custom sort function:

我们可以看到,toSortedList使用了默认的比较,但也可以提供一个自定义的比较器函数。我们现在可以看到,使用自定义排序函数对整数进行反向排序是可能的。

Observable<Integer> sourceObservable = Observable.range(10, 5);
TestSubscriber<List> subscriber = TestSubscriber.create();

Observable<List<Integer>> listObservable 
  = sourceObservable.toSortedList((int1, int2) -> int2 - int1);
listObservable.subscribe(subscriber);

subscriber.assertValue(Arrays.asList(14, 13, 12, 11, 10));

4.7. ToMap

4.7ToMap

The toMap operator converts the sequence of items emitted by an Observable into a map keyed by a specified key function.

toMap操作符将由Observable发出的项目序列转换为由指定的键函数键入的地图。

In particular, the toMap operator has different overloaded methods that require one, two or three of the following parameters:

特别是,toMap操作符有不同的重载方法,需要下列一个、两个或三个参数。

  1. the keySelector that produces a key from the item
  2. the valueSelector that produces from the emitted item the actual value that will be stored in the map
  3. the mapFactory that creates the collection that will hold the items

Let’s start defining a simple class Book:

让我们开始定义一个简单的类Book

class Book {
    private String title;
    private Integer year;

    // standard constructors, getters, and setters
}

We can now see how it’s possible to convert a series of emitted Book items to a Map, having the book title as key and the year as the value:

我们现在可以看到如何将一系列发出的项转换为Map,以书名为键,以年份为值

Observable<Book> bookObservable = Observable.just(
  new Book("The North Water", 2016), 
  new Book("Origin", 2017), 
  new Book("Sleeping Beauties", 2017)
);
TestSubscriber<Map> subscriber = TestSubscriber.create();

Observable<Map<String, Integer>> mapObservable = bookObservable
  .toMap(Book::getTitle, Book::getYear, HashMap::new);
mapObservable.subscribe(subscriber);

subscriber.assertValue(new HashMap() {{
  put("The North Water", 2016);
  put("Origin", 2017);
  put("Sleeping Beauties", 2017);
}});

4.8. ToMultiMap

4.8.ToMultiMap

When mapping, it is very common that many values share the same key. The data structure that maps one key to multiple values is called a multimap.

在映射时,许多值共享同一个键是很常见的。将一个键映射到多个值的数据结构被称为多映射。

This can be achieved with the toMultiMap operator that converts the sequence of items emitted by an Observable into a List that is also a map keyed by a specified key function.

这可以通过toMultiMap操作符实现,该操作符将Observable发出的项目序列转换为List,该List也是由指定键函数键入的地图。

This operator adds another parameter to those of the toMap operator, the collectionFactory. This parameter permits to specify in which collection type the value should be stored. Let’s see how this can be done:

这个操作符在toMap操作符的参数中又增加了一个参数,即collectionFactory。这个参数允许指定值应该被存储在哪个集合类型中。让我们看看如何做到这一点。

Observable<Book> bookObservable = Observable.just(
  new Book("The North Water", 2016), 
  new Book("Origin", 2017), 
  new Book("Sleeping Beauties", 2017)
);
TestSubscriber<Map> subscriber = TestSubscriber.create();

Observable multiMapObservable = bookObservable.toMultimap(
  Book::getYear, 
  Book::getTitle, 
  () -> new HashMap<>(), 
  (key) -> new ArrayList<>()
);
multiMapObservable.subscribe(subscriber);

subscriber.assertValue(new HashMap() {{
    put(2016, Arrays.asList("The North Water"));
    put(2017, Arrays.asList("Origin", "Sleeping Beauties"));
}});

5. Conclusion

5.总结

In this article, we explored the mathematical and aggregate operators available within RxJava – and, of course, simple example of how to use each.

在这篇文章中,我们探讨了RxJava中可用的数学和聚合运算符–当然还有如何使用每个运算符的简单例子。

As always, all code examples in this article can be found over on Github.

一如既往,本文中的所有代码示例都可以在Github上找到over