Introduction to RxRelay for RxJava – RxJava的RxRelay简介

最后修改: 2018年 3月 25日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

The popularity of RxJava has led to the creation of multiple third-party libraries that extend its functionality.

RxJava的普及导致了多个扩展其功能的第三方库的诞生。

Many of those libraries were an answer to typical problems that developers were dealing with when using RxJava. RxRelay is one of these solutions.

这些库中有许多是对开发人员在使用RxJava时遇到的典型问题的回答。RxRelay就是这些解决方案之一。

2. Dealing With a Subject

2.处理一个主题

Simply put, a Subject acts as a bridge between Observable and Observer. Since it’s an Observer, it can subscribe to one or more Observables and receive events from them.

简单地说,Subject充当了ObservableObserver之间的桥梁。由于它是一个Observer,它可以订阅一个或多个Observable并从它们那里接收事件。

Also, given it’s at the same time an Observable, it can reemit events or emit new events to its subscribers. More information about the Subject can be found in this article.

此外,鉴于它同时也是一个Observable,它可以重新发送事件或向其订阅者发出新的事件。关于Subject的更多信息可在此文章中找到。

One of the issues with Subject is that after it receives onComplete() or onError() – it’s no longer able to move data. Sometimes it’s the desired behavior, but sometimes it’s not.

Subject 的一个问题是,在它收到onComplete() onError()之后–它不再能够移动数据。有时这是所期望的行为,但有时却不是。

In cases when such behavior isn’t desired, we should consider using RxRelay.

在不需要这种行为的情况下,我们应该考虑使用RxRelay

3. Relay

3.Relay

A Relay is basically a Subject, but without the ability to call onComplete() and onError(), thus it’s constantly able to emit data.

一个Relay基本上是一个Subject,但是没有调用onComplete()onError()的能力,因此它能够不断地发射数据。

This allows us to create bridges between different types of API without worrying about accidentally triggering the terminal state.

这使我们能够在不同类型的API之间建立桥梁,而不必担心意外地触发终端状态。

To use RxRelay we need to add the following dependency to our project:

为了使用RxRelay,我们需要在我们的项目中添加以下依赖性。

<dependency>
  <groupId>com.jakewharton.rxrelay2</groupId>
  <artifactId>rxrelay</artifactId>
  <version>1.2.0</version>
</dependency>

4. Types of Relay

4、Relay的类型

There’re three different types of Relay available in the library. We’ll quickly explore all three here.

库中有三种不同类型的Relay可用。我们将在这里快速探索这三种类型。

4.1. PublishRelay

4.1.PublishRelay

This type of Relay will reemit all events once the Observer has subscribed to it.

这种类型的Relay一旦Observer订阅了它,就会重新发送所有事件。

The events will be emitted to all subscribers:

事件将被排放给所有订阅者。

public void whenObserverSubscribedToPublishRelay_itReceivesEmittedEvents() {
    PublishRelay<Integer> publishRelay = PublishRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    
    publishRelay.subscribe(firstObserver);
    firstObserver.assertSubscribed();
    publishRelay.accept(5);
    publishRelay.accept(10);
    publishRelay.subscribe(secondObserver);
    secondObserver.assertSubscribed();
    publishRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    
    // second receives only the last event
    secondObserver.assertValue(15);
}

There’s no buffering of events in this case, so this behavior is similar to a cold Observable.

在这种情况下,没有对事件进行缓冲,所以这种行为类似于冷的Observable.

4.2. BehaviorRelay

4.2.BehaviorRelay

This type of Relay will reemit the most recent observed event and all subsequent events once the Observer has subscribed:

一旦Observer订阅,这种类型的Relay将重新发送最近观察到的事件和所有后续事件。

public void whenObserverSubscribedToBehaviorRelay_itReceivesEmittedEvents() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    behaviorRelay.accept(5);     
    behaviorRelay.subscribe(firstObserver);
    behaviorRelay.accept(10);
    behaviorRelay.subscribe(secondObserver);
    behaviorRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(10, 15);
}

When we’re creating the BehaviorRelay we can specify the default value, which will be emitted, if there’re no other events to emit.

当我们创建BehaviorRelay时,我们可以指定默认值,如果没有其他事件要发射,就会发射。

To specify the default value we can use createDefault() method:

为了指定默认值,我们可以使用createDefault()方法。

public void whenObserverSubscribedToBehaviorRelay_itReceivesDefaultValue() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertValue(1);
}

If we don’t want to specify the default value, we can use the create() method:

如果我们不想指定默认值,我们可以使用create() 方法。

public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue_itIsEmpty() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}

4.3. ReplayRelay

4.3.ReplayRelay

This type of Relay buffers all events it has received and then reemits it to all subscribers that subscribe to it:

这种类型的Relay缓冲了它所收到的所有事件,然后将其重新发送至订阅它的所有订阅者。

 public void whenObserverSubscribedToReplayRelay_itReceivesEmittedEvents() {
    ReplayRelay<Integer> replayRelay = ReplayRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    replayRelay.subscribe(firstObserver);
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.subscribe(secondObserver);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(5, 10, 15);
}

All elements are buffered and all subscribers will receive the same events, so this behavior is similar to the cold Observable.

所有的元素都被缓冲,所有的订阅者都将收到相同的事件,所以这种行为类似于冰冷的Observable.

When we’re creating the ReplayRelay we can provide maximal buffer size and time to live for events.

当我们创建ReplayRelay时,我们可以为事件提供最大的缓冲区大小和生存时间。

To create the Relay with limited buffer size we can use the createWithSize() method. When there’re more events to be buffered than the set buffer size, previous elements will be discarded:

要创建缓冲区大小有限的Relay,我们可以使用createWithSize()方法。当需要缓冲的事件多于设定的缓冲区大小时,之前的元素将被丢弃。

public void whenObserverSubscribedToReplayRelayWithLimitedSize_itReceivesEmittedEvents() {
    ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
    TestObserver<Integer> firstObserver = TestObserver.create();
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.accept(20);
    replayRelay.subscribe(firstObserver);
    firstObserver.assertValues(15, 20);
}

We can also create ReplayRelay with max time to leave for buffered events using the createWithTime() method:

我们还可以使用createWithTime()方法创建ReplayRelay,为缓冲事件留出最大时间。

public void whenObserverSubscribedToReplayRelayWithMaxAge_thenItReceivesEmittedEvents() {
    SingleScheduler scheduler = new SingleScheduler();
    ReplayRelay<Integer> replayRelay =
      ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, scheduler);
    long current =  scheduler.now(TimeUnit.MILLISECONDS);
    TestObserver<Integer> firstObserver = TestObserver.create();
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.accept(20);
    Thread.sleep(3000);
    replayRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}

5. Custom Relay

5.自定义继电器

All types described above extend the common abstract class Relay, this gives us an ability to write our own custom Relay class.

上面描述的所有类型都扩展了通用的抽象类Relay,这让我们有能力编写我们自己的自定义Relay类。

To create a custom Relay we need to implement three methods: accept(), hasObservers() and subscribeActual().

要创建一个自定义的Relay,我们需要实现三个方法。accept(), hasObservers() subscribeActual()。

Let’s write a simple Relay that will reemit event to one of the subscribers chosen at random:

让我们写一个简单的Relay,它将向随机选择的订阅者之一重新发送事件。

public class RandomRelay extends Relay<Integer> {
    Random random = new Random();

    List<Observer<? super Integer>> observers = new ArrayList<>();

    @Override
    public void accept(Integer integer) {
        int observerIndex = random.nextInt() % observers.size();
        observers.get(observerIndex).onNext(integer);
    }

    @Override
    public boolean hasObservers() {
        return observers.isEmpty();
    }

    @Override
    protected void subscribeActual(Observer<? super Integer> observer) {
        observers.add(observer);
        observer.onSubscribe(Disposables.fromRunnable(
          () -> System.out.println("Disposed")));
    }
}

We can now test that only one subscriber will receive the event:

我们现在可以测试一下,只有一个订阅者会收到该事件。

public void whenTwoObserversSubscribedToRandomRelay_thenOnlyOneReceivesEvent() {
    RandomRelay randomRelay = new RandomRelay();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    randomRelay.subscribe(firstObserver);
    randomRelay.subscribe(secondObserver);
    randomRelay.accept(5);
    if(firstObserver.values().isEmpty()) {
        secondObserver.assertValue(5);
    } else {
        firstObserver.assertValue(5);
        secondObserver.assertEmpty();
    }
}

6. Conclusion

6.结论

In this tutorial, we had a look at RxRelay, a type similar to Subject but without the ability to trigger the terminal state.

在本教程中,我们看了一下RxRelay,这是一个类似于Subject的类型,但没有触发终端状态的能力。

More information can be found in the documentation. And, as always all the code samples can be found over on GitHub.

更多信息可以在文档中找到。而且,像往常一样,所有的代码样本都可以在GitHub上找到over