Implementing Custom Operators in RxJava – 在RxJava中实现自定义操作符

最后修改: 2017年 8月 2日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this quick tutorial, we’ll show how to write a custom operator using RxJava.

在这个快速教程中,我们将展示如何使用RxJava编写一个自定义操作符。

We’ll discuss how to build this simple operator, as well as a transformer – both as a class or as a simple function.

我们将讨论如何构建这个简单的操作符,以及一个转化器–既可以作为一个类,也可以作为一个简单的函数。

2. Maven Configuration

2.Maven配置

First, we need to make sure we have the rxjava dependency in pom.xml:

首先,我们需要确保我们在pom.xml中拥有rxjava依赖性。

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.3.0</version>
</dependency>

We can check the latest version of rxjava on Maven Central.

我们可以在Maven Central上查看rxjava的最新版本。

3. Custom Operator

3.自定义操作员

We can create our custom operator by implementing Operator interface, in the following example we implemented a simple operator for removing non-alphanumeric characters from a String:

我们可以通过实现Operator接口来创建我们的自定义操作符,在下面的例子中,我们实现了一个简单的操作符,用于从String中删除非字母数字字符。

public class ToCleanString implements Operator<String, String> {

    public static ToCleanString toCleanString() {
        return new ToCleanString();
    }

    private ToCleanString() {
        super();
    }

    @Override
    public Subscriber<? super String> call(final Subscriber<? super String> subscriber) {
        return new Subscriber<String>(subscriber) {
            @Override
            public void onCompleted() {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            }

            @Override
            public void onError(Throwable t) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onError(t);
                }
            }

            @Override
            public void onNext(String item) {
                if (!subscriber.isUnsubscribed()) {
                    final String result = item.replaceAll("[^A-Za-z0-9]", "");
                    subscriber.onNext(result);
                }
            }
        };
    }
}

In the above example, we need to check if the subscriber is subscribed before applying our operation and emitting the item to it as it will be unnecessary.

在上面的例子中,我们需要在应用我们的操作和向它发射项目之前检查订阅者是否被订阅,因为这将是不必要的。

We are also restricting instance creation only to static factory methods to achieve a more user-friendly readability when chaining methods and using the static import.

我们还将实例创建限制在静态工厂方法中,以便在连锁方法和使用静态导入时实现更友好的可读性

And now, we can use lift operator to chain our custom operator easily with other operators:

而现在,我们可以使用lift操作符来将我们的自定义操作符与其他操作符轻松地连锁起来。

observable.lift(toCleanString())....

Here is a simple test of our custom operator:

下面是对我们的自定义运算符的一个简单测试。

@Test
public void whenUseCleanStringOperator_thenSuccess() {
    List<String> list = Arrays.asList("john_1", "tom-3");
    List<String> results = new ArrayList<>();
    Observable<String> observable = Observable
      .from(list)
      .lift(toCleanString());
    observable.subscribe(results::add);

    assertThat(results, notNullValue());
    assertThat(results, hasSize(2));
    assertThat(results, hasItems("john1", "tom3"));
}

4. Transformer

4.变压器

We can also create our operator by implementing Transformer interface:

我们也可以通过实现Transformer接口来创建我们的操作员。

public class ToLength implements Transformer<String, Integer> {

    public static ToLength toLength() {
        return new ToLength();
    }

    private ToLength() {
        super();
    }

    @Override
    public Observable<Integer> call(Observable<String> source) {
        return source.map(String::length);
    }
}

Note that we use the transformer toLength to transform our observable from String to its length in Integer.

请注意,我们使用转换器toLength来将我们的可观察对象从String转换为其长度Integer

We will need a compose operator to use our transformer:

我们将需要一个compose操作符来使用我们的转化器。

observable.compose(toLength())...

Here is a simple test:

这里有一个简单的测试。

@Test
public void whenUseToLengthOperator_thenSuccess() {
    List<String> list = Arrays.asList("john", "tom");
    List<Integer> results = new ArrayList<>();
    Observable<Integer> observable = Observable
      .from(list)
      .compose(toLength());
    observable.subscribe(results::add);

    assertThat(results, notNullValue());
    assertThat(results, hasSize(2));
    assertThat(results, hasItems(4, 3));
}

The lift(Operator) operates on observable’s subscribers but compose(Transformer) work on the observable itself.

lift(Operator)对观察者的订阅者进行操作,但是compose(Transformer)对观察者本身进行操作。

When we create our custom operator, we should pick Transformer if we want to operate on the observable as a whole and choose Operator if we want to operate on the items emitted by the observable

当我们创建自定义操作符时,如果我们想对整个可观察对象进行操作,我们应该选择Transformer,如果我们想对由可观察对象发出的项目进行操作,则选择Operator

5. Custom Operator as a Function

5.作为函数的自定义操作符

We can implement our custom operator as a function instead of public class :

我们可以将我们的自定义操作符作为一个函数来实现,而不是public class

Operator<String, String> cleanStringFn = subscriber -> {
    return new Subscriber<String>(subscriber) {
        @Override
        public void onCompleted() {
            if (!subscriber.isUnsubscribed()) {
                subscriber.onCompleted();
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!subscriber.isUnsubscribed()) {
                subscriber.onError(t);
            }
        }

        @Override
        public void onNext(String str) {
            if (!subscriber.isUnsubscribed()) {
                String result = str.replaceAll("[^A-Za-z0-9]", "");
                subscriber.onNext(result);
            }
        }
    };
};

And here’s the simple test:

而这里有一个简单的测试。

List<String> results = new ArrayList<>();
Observable.from(Arrays.asList("ap_p-l@e", "or-an?ge"))
  .lift(cleanStringFn)
  .subscribe(results::add);

assertThat(results, notNullValue());
assertThat(results, hasSize(2));
assertThat(results, hasItems("apple", "orange"));

Similarly for Transformer example:

类似的还有变换器的例子。

@Test
public void whenUseFunctionTransformer_thenSuccess() {
    Transformer<String, Integer> toLengthFn = s -> s.map(String::length);

    List<Integer> results = new ArrayList<>();
    Observable.from(Arrays.asList("apple", "orange"))
      .compose(toLengthFn)
      .subscribe(results::add);

    assertThat(results, notNullValue());
    assertThat(results, hasSize(2));
    assertThat(results, hasItems(5, 6));
}

6. Conclusion

6.结论

In this article, we showed how to write our RxJava operators.

在这篇文章中,我们展示了如何编写我们的RxJava操作符。

And, as always, the full source code can be found over on GitHub.

而且,像往常一样,完整的源代码可以在GitHub上找到