RxJava One Observable, Multiple Subscribers – RxJava 一个观察者,多个订阅者

最后修改: 2018年 7月 12日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

The default behavior of multiple subscribers isn’t always desirable. In this article, we’ll cover how to change this behavior and handle multiple subscribers in a proper way.

多个订阅者的默认行为并不总是理想的。在这篇文章中,我们将介绍如何改变这种行为,以适当的方式处理多个订阅者。

But first, let’s have a look at the default behavior of multiple subscribers.

但首先,让我们看一下多个订阅者的默认行为。

2. Default Behaviour

2.默认行为

Let’s say we have the following Observable:

假设我们有以下Observable

private static Observable getObservable() {
    return Observable.create(subscriber -> {
        subscriber.onNext(gettingValue(1));
        subscriber.onNext(gettingValue(2));

        subscriber.add(Subscriptions.create(() -> {
            LOGGER.info("Clear resources");
        }));
    });
}

This emits two elements as soon as the Subscribers subscribes.

一旦Subscribers订阅,就会发出两个元素。

In our example we have two Subscribers:

在我们的例子中,我们有两个Subscribers。

LOGGER.info("Subscribing");

Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));

s1.unsubscribe();
s2.unsubscribe();

Imagine that getting each element is a costly operation – it may include, for example, an intensive computation or opening an URL-connection.

想象一下,获得每个元素是一个昂贵的操作–它可能包括,例如,密集的计算或打开一个URL-连接。

To keep things simple we’ll just return a number:

为了简单起见,我们将只返回一个数字。

private static Integer gettingValue(int i) {
    LOGGER.info("Getting " + i);
    return i;
}

Here is the output:

下面是输出结果。

Subscribing
Getting 1
subscriber#1 is printing 1
Getting 2
subscriber#1 is printing 2
Getting 1
subscriber#2 is printing 1
Getting 2
subscriber#2 is printing 2
Clear resources
Clear resources

As we can see getting each element as well as clearing the resources is performed twice by default – once for each Subscriber. This isn’t what we want. The ConnectableObservable class helps to fix the problem.

我们可以看到获取每个元素以及清除资源默认执行两次–对每个Subscriber一次。这并不是我们想要的。ConnectableObservable类有助于解决这个问题。

3. ConnectableObservable

3.ConnectableObservable

The ConnectableObservable class allows to share the subscription with multiple subscribers and not to perform the underlying operations several times.

ConnectableObservable类允许与多个订阅者共享订阅,并且不需要多次执行基础操作。

But first, let’s create a ConnectableObservable.

但首先,让我们创建一个ConnectableObservable

3.1. publish()

3.1.publish()

publish() method is what creates a ConnectableObservable from an Observable:

publish()方法是从Observable中创建ConnectableObservable

ConnectableObservable obs = Observable.create(subscriber -> {
    subscriber.onNext(gettingValue(1));
    subscriber.onNext(gettingValue(2));
    subscriber.add(Subscriptions.create(() -> {
        LOGGER.info("Clear resources");
    }));
}).publish();

But for now, it does nothing. What makes it work is the connect() method.

但现在,它什么也没做。让它发挥作用的是connect()方法。

3.2. connect()

3.2 连接()

Until ConnectableObservable‘s connect() method isn’t called Observable‘s onSubcribe() callback isn’t triggered even if there are some subscribers.

直到ConnectableObservableconnect()方法没有被调用ObservableonSubcribe()回调没有被触发即使存在一些订阅者。

Let’s demonstrate this:

让我们来证明这一点。

LOGGER.info("Subscribing");
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));
Thread.sleep(1000);
LOGGER.info("Connecting");
Subscription s = obs.connect();
s.unsubscribe();

We subscribe and then wait for a second before connecting. The output is:

我们订阅,然后在连接前等待一秒钟。输出结果是。

Subscribing
Connecting
Getting 1
subscriber #1 is printing 1
subscriber #2 is printing 1
Getting 2
subscriber #1 is printing 2
subscriber #2 is printing 2
Clear resources

As we can see:

正如我们所看到的。

    • Getting elements occurs only once as we wanted
    • Clearing resources occur only once as well
    • Getting elements starts a second after the subscribing.
    • Subscribing doesn’t trigger emitting of elements anymore. Only connect() does this

This delay can be beneficial – sometimes we need to give all the subscribers the same sequence of elements even if one of them subscribes earlier than another.

这种延迟可能是有益的–有时我们需要给所有的订阅者相同的元素序列,即使其中一个订阅者比另一个订阅者早。

3.3. The Consistent View of the Observables – connect() After subscribe()

3.3.观察者的一致视图 – connect()之后subscribe()

This use case can’t be demonstrated on our previous Observable as it runs cold and both subscribers get the whole sequence of elements anyway.

这个用例不能在我们之前的Observable上演示,因为它的运行是冷的,无论如何两个订阅者都会得到整个元素序列。

Imagine, instead, that an element emitting doesn’t depend on the moment of the subscription, events emitted on mouse clicks, for example. Now also imagine that a second Subscriber subscribes a second after the first.

想象一下,相反,一个元素的发射并不取决于订阅的时刻,比如说,鼠标点击时发射的事件。现在还可以想象,第二个Subscriber在第一个订阅者之后的一秒钟订阅。

The first Subscriber will get all the elements emitted during this example, whereas the second Subscriber will only receive some elements.

第一个Subscriber将得到这个例子中发出的所有元素,而第二个Subscriber将只收到一些元素。

On the other hand, using the connect() method in the right place can give both subscribers the same view on the Observable sequence.

另一方面,在正确的地方使用connect()方法可以给两个订阅者提供关于Observable序列的相同视图。

Example of Hot Observable

热门可观察的例子

Let’s create a hot Observable. It will be emitting elements on mouse clicks on JFrame.

让我们创建一个热的Observable。它将在鼠标点击JFrame时发射出元素。

Each element will be the x-coordinate of the click:

每个元素将是点击的X坐标。

private static Observable getObservable() {
    return Observable.create(subscriber -> {
        frame.addMouseListener(new MouseAdapter() {
            @Override
            public void mouseClicked(MouseEvent e) {
                subscriber.onNext(e.getX());
            }
        });
        subscriber.add(Subscriptions.create(() {
            LOGGER.info("Clear resources");
            for (MouseListener listener : frame.getListeners(MouseListener.class)) {
                frame.removeMouseListener(listener);
            }
        }));
    });
}

The Default Behavior of Hot Observable

热的默认行为Observable

Now if we subscribe two Subscribers one after another with a second interval, run the program and start clicking, we’ll see that the first Subscriber will get more elements:

现在,如果我们一个接一个地订阅两个Subscriber,间隔时间为一秒,运行程序并开始点击,我们会看到第一个Subscriber将获得更多的元素。

public static void defaultBehaviour() throws InterruptedException {
    Observable obs = getObservable();

    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}
subscribing #1
subscriber#1 is printing x-coordinate 280
subscriber#1 is printing x-coordinate 242
subscribing #2
subscriber#1 is printing x-coordinate 343
subscriber#2 is printing x-coordinate 343
unsubscribe#1
clearing resources
unsubscribe#2
clearing resources

connect() After subscribe()

connect()subscribe()之后

To make both subscribers get the same sequence we’ll convert this Observable to the ConnectableObservable and call connect() after the subscription both Subscribers:

为了使两个订阅者得到相同的序列,我们将把这个Observable转换为ConnectableObservable,并在订阅两个Subscriber后调用connect()

public static void subscribeBeforeConnect() throws InterruptedException {

    ConnectableObservable obs = getObservable().publish();

    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(
      i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(
      i ->  LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe connected");
    s.unsubscribe();
}

Now they’ll get the same sequence:

现在他们会得到同样的序列。

subscribing #1
subscribing #2
connecting:
subscriber#1 is printing x-coordinate 317
subscriber#2 is printing x-coordinate 317
subscriber#1 is printing x-coordinate 364
subscriber#2 is printing x-coordinate 364
unsubscribe connected
clearing resources

So the point is to wait for the moment when all subscribers are ready and then call connect().

因此,重点是等待所有订阅者准备好的时刻,然后调用 connect()

In a Spring application, we may subscribe all of the components during the application startup for example and call connect() in onApplicationEvent().

在一个Spring应用程序中,我们可以在应用程序启动时订阅所有的组件,例如在onApplicationEvent()中调用connect()

But let’s return to our example; note that all the clicks before the connect() method are missed. If we don’t want to miss elements but on the contrary process them we can put connect() earlier in the code and force the Observable to produce events in the absence of any Subscriber.

但是让我们回到我们的例子;注意在connect()方法之前的所有点击都被错过了。如果我们不想错过元素,相反,我们可以将connect()放在代码的前面,强迫Observable在没有任何Subscriber的情况下产生事件。

3.4. Forcing Subscription in the Absence of Any Subscriberconnect() Before subscribe()

3.4.在没有任何用户的情况下强制订阅 – connect()subscribe()之前

To demonstrate this let’s correct our example:

为了证明这一点,让我们纠正一下我们的例子。

public static void connectBeforeSubscribe() throws InterruptedException {
    ConnectableObservable obs = getObservable()
      .doOnNext(x -> LOGGER.info("saving " + x)).publish();
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    s.unsubscribe();
}

The steps are relatively simple:

步骤相对简单。

  • First, we connect
  • Then we wait for one second and subscribe the first Subscriber
  • Finally, we wait for another second and subscribe the second Subscriber

Note that we’ve added doOnNext() operator. Here we could store elements in the database for example but in our code, we just print “saving… “.

请注意,我们已经添加了doOnNext()操作符。在这里,我们可以在数据库中存储元素,例如,但在我们的代码中,我们只是打印 “保存…”。

If we launch the code and begin clicking we’ll see that the elements are emitted and processed immediately after the connect() call:

如果我们启动代码并开始点击,我们会看到元素在connect()调用后立即被发射和处理。

connecting:
saving 306
saving 248
subscribing #1
saving 377
subscriber#1 is printing x-coordinate 377
saving 295
subscriber#1 is printing x-coordinate 295
saving 206
subscriber#1 is printing x-coordinate 206
subscribing #2
saving 347
subscriber#1 is printing x-coordinate 347
subscriber#2 is printing x-coordinate 347
clearing resources

If there were no subscribers, the elements would still be processed.

如果没有订阅者,这些元素仍然会被处理。

So the connect() method starts emitting and processing elements regardless of whether someone is subscribed as if there was an artificial Subscriber with an empty action which consumed the elements.

所以connect()方法开始发射和处理元素,不管是否有人订阅,就像有一个人为的Subscriber,有一个空的动作,消耗了这些元素。

And if some real Subscribers subscribe, this artificial mediator just propagates elements to them.

而如果一些真正的Subscribers订阅了,这个人工调解人只是向他们传播元素。

To unsubscribe the artificial Subscriber we perform:

为了取消人工订户,我们进行。

s.unsubscribe();

Where:

在哪里?

Subscription s = obs.connect();

3.5. autoConnect()

3.5.自动连接()

This method implies that connect() isn’t called before or after subscriptions but automatically when the first Subscriber subscribes.

这个方法意味着 connect()不是在订阅之前或之后被调用,而是在第一个Subscriber订阅时自动调用

Using this method, we can’t call connect() ourselves as the returned object is a usual Observable which doesn’t have this method but uses an underlying ConnectableObservable:

使用这个方法,我们不能自己调用connect(),因为返回的对象是一个通常的Observable,它没有这个方法,而是使用一个底层的ConnectableObservable

public static void autoConnectAndSubscribe() throws InterruptedException {
    Observable obs = getObservable()
    .doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect();

    LOGGER.info("autoconnect()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription s1 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription s2 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));

    Thread.sleep(1000);
    LOGGER.info("unsubscribe 1");
    s1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe 2");
    s2.unsubscribe();
}

Note that we can’t also unsubscribe the artificial Subscriber. We can unsubscribe all the real Subscribers but the artificial Subscriber will still process the events.

请注意,我们不能同时退订人工Subscriber。我们可以退订所有真正的Subscribers,但人工Subscriber仍将处理事件。

To understand this let’s look at what is happening at the end after the last subscriber has unsubscribed:

为了理解这一点,让我们看看在最后一个订阅者退订后,最后发生了什么。

subscribing #1
saving 296
subscriber#1 is printing x-coordinate 296
saving 329
subscriber#1 is printing x-coordinate 329
subscribing #2
saving 226
subscriber#1 is printing x-coordinate 226
subscriber#2 is printing x-coordinate 226
unsubscribe 1
saving 268
subscriber#2 is printing x-coordinate 268
saving 234
subscriber#2 is printing x-coordinate 234
unsubscribe 2
saving 278
saving 268

As we can see clearing resources doesn’t happen and saving elements with doOnNext() continues after the second unsubscribing. This means that the artificial Subscriber doesn’t unsubscribe but continues to consume elements.

我们可以看到,在第二次取消订阅后,清除资源的情况并没有发生,而用doOnNext()保存元素的情况继续发生。这意味着人造的Subscriber并没有取消订阅,而是继续消耗元素。

3.6. refCount()

3.6.refCount()

refCount() is similar to autoConnect() in that connecting also happens automatically as soon as the first Subscriber subscribes.

refCount()autoConnect()类似,一旦第一个Subscriber订阅,连接也会自动发生。

Unlike autoconnect() disconnecting also happens automatically when the last Subscriber unsubscribes:

autoconnect()不同,当最后一个Subscriber取消订阅时,断开连接也会自动发生。

public static void refCountAndSubscribe() throws InterruptedException {
    Observable obs = getObservable()
      .doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount();

    LOGGER.info("refcount()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(
      i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(
      i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));

    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}
refcount()
subscribing #1
saving 265
subscriber#1 is printing x-coordinate 265
saving 338
subscriber#1 is printing x-coordinate 338
subscribing #2
saving 203
subscriber#1 is printing x-coordinate 203
subscriber#2 is printing x-coordinate 203
unsubscribe#1
saving 294
subscriber#2 is printing x-coordinate 294
unsubscribe#2
clearing resources

4. Conclusion

4.总结

The ConnectableObservable class helps to handle multiple subscribers with little effort.

ConnectableObservable类有助于不费吹灰之力地处理多个订阅者。

Its methods look similar but change the subscribers’ behavior greatly due to implementation subtleties meaning even the order of the methods matters.

它的方法看起来很相似,但由于实现上的细微差别而大大改变了订阅者的行为,这意味着甚至方法的顺序也很重要。

The full source code for all the examples used in this article can be found in the GitHub project.

本文中使用的所有示例的完整源代码可以在GitHub项目中找到。