Schedulers in RxJava – RxJava中的调度器

最后修改: 2017年 9月 25日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we’re going to focus on different types of Schedulers that we’re going to use in writing multithreading programs based on RxJava Observable’s subscribeOn and observeOn methods.

在这篇文章中,我们将重点介绍不同类型的Scheduler,我们将在编写基于RxJavaObservable的subscribeOnobserveOn方法的多线程程序中使用。

Schedulers give the opportunity to specify where and likely when to execute tasks related to the operation of an Observable chain.

Schedulers提供了指定在何处以及何时执行与Observable链的操作有关的任务的机会。

We can obtain a Scheduler from the factory methods described in the class Schedulers.

我们可以从类Schedulers中描述的工厂方法中获得一个Scheduler

2. Default Threading Behavior

2.默认的线程行为

By default, Rx is single-threaded which implies that an Observable and the chain of operators that we can apply to it will notify its observers on the same thread on which its subscribe() method is called.

默认情况下,Rx是单线程的,这意味着一个Observable和我们可以应用于它的操作链将在其subscribe()方法被调用的同一线程中通知其观察者。

The observeOn and subscribeOn methods take as an argument a Scheduler, that, as the name suggests, is a tool that we can use for scheduling individual actions.

observeOnsubscribeOn方法的参数是Scheduler,顾名思义,它是一个我们可以用来安排单个行动的工具。

We’ll create our implementation of a Scheduler by using the createWorker method, which returns a Scheduler.Worker. A worker accepts actions and executes them sequentially on a single thread.

我们将通过使用createWorker方法来创建我们的Scheduler实现,该方法返回一个Scheduler.Worker. 一个worker接受动作并在单个线程上按顺序执行它们。

In a way, a worker is a Scheduler itself, but we’ll not refer to it as a Scheduler to avoid confusion.

在某种程度上,worker本身就是一个Scheduler,但是为了避免混淆,我们将不把它称为Scheduler

2.1. Scheduling an Action

2.1.安排一个行动

We can schedule a job on any Scheduler by creating a new worker and scheduling some actions:

我们可以在任何Scheduler上通过创建一个新的worker和安排一些行动来安排一项工作。

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> result += "action");
 
Assert.assertTrue(result.equals("action"));

The action is then queued on the thread that the worker is assigned to.

然后,该动作会在该工作者被分配到的线程上排队。

2.2. Canceling an Action

2.2.取消一项行动

Scheduler.Worker extends Subscription. Calling the unsubscribe method on a worker will result in the queue being emptied and all pending tasks being canceled. We can see that by example:

Scheduler.Worker扩展了Subscription。在一个worker上调用unsubscribe方法将导致队列被清空,所有待定任务被取消。我们可以通过例子来看。

Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += "First_Action";
    worker.unsubscribe();
});
worker.schedule(() -> result += "Second_Action");
 
Assert.assertTrue(result.equals("First_Action"));

The second task is never executed because the one before it canceled the whole operation. Actions that were in the process of being executed will be interrupted.

第二个任务永远不会被执行,因为它之前的任务取消了整个操作。正在执行的行动将被打断。

3. Schedulers.newThread

3、Schedulers.newThread

This scheduler simply starts a new thread every time it is requested via subscribeOn() or observeOn().

这个调度器只是在每次通过subscribeOn()observeOn()请求时启动一个新线程。

It’s hardly ever a good choice, not only because of the latency involved when starting a thread but also because this thread is not reused:

这几乎不是一个好的选择,不仅是因为启动线程时涉及的延迟,而且还因为这个线程没有被重复使用。

Observable.just("Hello")
  .observeOn(Schedulers.newThread())
  .doOnNext(s ->
    result2 += Thread.currentThread().getName()
  )
  .observeOn(Schedulers.newThread())
  .subscribe(s ->
    result1 += Thread.currentThread().getName()
  );
Thread.sleep(500);
Assert.assertTrue(result1.equals("RxNewThreadScheduler-1"));
Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));

When the Worker is done, the thread simply terminates. This Scheduler can be used only when tasks are coarse-grained: it takes a lot of time to complete, but there are very few of them so that threads are unlikely to be reused at all.

Worker完成后,线程就简单地终止了。这种Scheduler只有在任务是粗粒度的时候才能使用:它需要大量的时间来完成,但是它们的数量很少,所以线程根本不可能被重用。

Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "_Start";
    worker.schedule(() -> result += "_worker_");
    result += "_End";
});
Thread.sleep(3000);
Assert.assertTrue(result.equals(
  "RxNewThreadScheduler-1_Start_End_worker_"));

When we scheduled the worker on a NewThreadScheduler, we saw that the worker was bound to a particular thread.

当我们在NewThreadScheduler上安排worker时,我们看到worker被绑定到一个特定的线程。

4. Schedulers.immediate

4.Schedulers.immediate

Schedulers.immediate is a special scheduler that invokes a task within the client thread in a blocking way, rather than asynchronously and returns when the action is completed:

Schedulers.immediate是一个特殊的调度器,它以阻塞的方式而不是异步的方式在客户端线程中调用一个任务,并在行动完成后返回。

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "_Start";
    worker.schedule(() -> result += "_worker_");
    result += "_End";
});
Thread.sleep(500);
Assert.assertTrue(result.equals(
  "main_Start_worker__End"));

In fact, subscribing to an Observable via immediate Scheduler typically has the same effect as not subscribing with any particular Scheduler at all:

事实上,通过immediate Scheduler订阅Observable通常与不使用任何特定S调度器订阅的效果相同。

Observable.just("Hello")
  .subscribeOn(Schedulers.immediate())
  .subscribe(s ->
    result += Thread.currentThread().getName()
  );
Thread.sleep(500);
Assert.assertTrue(result.equals("main"));

5. Schedulers.trampoline

5.Schedulers.trampoline

The trampoline Scheduler is very similar to immediate because it also schedules tasks in the same thread, effectively blocking.

trampoline Schedulerimmediate非常相似,因为它也在同一个线程中调度任务,有效地进行阻塞。

However, the upcoming task is executed when all previously scheduled tasks complete:

然而,即将到来的任务是在所有先前安排的任务完成后执行u的。

Observable.just(2, 4, 6, 8)
  .subscribeOn(Schedulers.trampoline())
  .subscribe(i -> result += "" + i);
Observable.just(1, 3, 5, 7, 9)
  .subscribeOn(Schedulers.trampoline())
  .subscribe(i -> result += "" + i);
Thread.sleep(500);
Assert.assertTrue(result.equals("246813579"));

Immediate invokes a given task right away, whereas trampoline waits for the current task to finish.

Immediate立即调用一个给定的任务,而trampoline则等待当前任务完成。

The trampoline‘s worker executes every task on the thread that scheduled the first task. The first call to schedule is blocking until the queue is emptied:

trampolineworker在调度第一个任务的线程上执行每个任务。对schedule的第一次调用是阻塞的,直到队列被清空。

Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "Start";
    worker.schedule(() -> {
        result += "_middleStart";
        worker.schedule(() ->
            result += "_worker_"
        );
        result += "_middleEnd";
    });
    result += "_mainEnd";
});
Thread.sleep(500);
Assert.assertTrue(result
  .equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Schedulers.from

6.Schedulers.from

Schedulers are internally more complex than Executors from java.util.concurrent – so a separate abstraction was needed.

调度器在内部比执行器(来自java.util.concurrent)更加复杂,因此需要一个单独的抽象。

But because they are conceptually quite similar, unsurprisingly there is a wrapper that can turn Executor into Scheduler using the from factory method:

但是由于它们在概念上非常相似,所以毫不奇怪,有一个包装器可以使用from工厂方法将Executor变成Scheduler

private ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder()
      .setNameFormat(pattern)
      .build();
}

@Test
public void givenExecutors_whenSchedulerFrom_thenReturnElements() 
 throws InterruptedException {
 
    ExecutorService poolA = newFixedThreadPool(
      10, threadFactory("Sched-A-%d"));
    Scheduler schedulerA = Schedulers.from(poolA);
    ExecutorService poolB = newFixedThreadPool(
      10, threadFactory("Sched-B-%d"));
    Scheduler schedulerB = Schedulers.from(poolB);

    Observable<String> observable = Observable.create(subscriber -> {
      subscriber.onNext("Alfa");
      subscriber.onNext("Beta");
      subscriber.onCompleted();
    });;

    observable
      .subscribeOn(schedulerA)
      .subscribeOn(schedulerB)
      .subscribe(
        x -> result += Thread.currentThread().getName() + x + "_",
        Throwable::printStackTrace,
        () -> result += "_Completed"
      );
    Thread.sleep(2000);
    Assert.assertTrue(result.equals(
      "Sched-A-0Alfa_Sched-A-0Beta__Completed"));
}

SchedulerB is used for a short period of time, but it barely schedules a new action on schedulerA, which does all the work. Thus, multiple subscribeOn methods aren’t only ignored, but also introduce a small overhead.

SchedulerB被使用的时间很短,但它几乎没有在schedulerA上安排新的动作,而后者做了所有的工作。因此,多个subscribeOn方法不仅会被忽略,而且还会引入少量的开销。

7. Schedulers.io

7.Schedulers.io

This Scheduler is similar to the newThread except for the fact that already started threads are recycled and can possibly handle future requests.

这个SchedulernewThread类似,除了已经启动的线程被回收,并可能处理未来的请求。

This implementation works similarly to ThreadPoolExecutor from java.util.concurrent with an unbounded pool of threads. Every time a new worker is requested, either a new thread is started (and later kept idle for some time) or the idle one is reused:

这个实现与java.util.concurrent中的ThreadPoolExecutor的工作原理类似,都是一个无界线程池。每次请求一个新的worker时,要么启动一个新的线程(之后保持空闲一段时间),要么重用空闲的线程。

Observable.just("io")
  .subscribeOn(Schedulers.io())
  .subscribe(i -> result += Thread.currentThread().getName());
 
Assert.assertTrue(result.equals("RxIoScheduler-2"));

We need to be careful with unbounded resources of any kind – in case of slow or unresponsive external dependencies like web services, io scheduler might start an enormous number of threads, leading to our very own application becoming unresponsive.

我们需要小心处理任何类型的无界资源–如果出现缓慢或无响应的外部依赖,如Web服务,ioscheduler可能会启动大量的线程,导致我们自己的应用程序变得无响应。

In practice, following Schedulers.io is almost always a better choice.

在实践中,遵循Schedulers.io几乎总是一个更好的选择。

8. Schedulers.computation

8.Schedulers.computation

Computation Scheduler by default limits the number of threads running in parallel to the value of availableProcessors(), as found in the Runtime.getRuntime() utility class.

Computation Scheduler默认将并行运行的线程数限制在availableProcessors()的值内,可在Runtime.getRuntime()实用类中找到。

So we should use a computation scheduler when tasks are entirely CPU-bound; that is, they require computational power and have no blocking code.

因此,当任务完全由CPU约束时,我们应该使用计算调度器;也就是说,它们需要计算能力,并且没有阻塞代码。

It uses an unbounded queue in front of every thread, so if the task is scheduled, but all cores are occupied, it will be queued. However, the queue just before each thread will keep growing:

它在每个线程前面使用一个无界队列,所以如果任务被安排了,但所有的核心都被占用了,它就会被排队。然而,每个线程前的队列将不断增长。

Observable.just("computation")
  .subscribeOn(Schedulers.computation())
  .subscribe(i -> result += Thread.currentThread().getName());
Assert.assertTrue(result.equals("RxComputationScheduler-1"));

If for some reason, we need a different number of threads than the default, we can always use the rx.scheduler.max-computation-threads system property.

如果由于某种原因,我们需要一个不同于默认的线程数,我们可以随时使用rx.scheduler.max-computation-threads系统属性。

By taking fewer threads we can ensure that there is always one or more CPU cores idle, and even under heavy load, computation thread pool does not saturate the server. It’s simply not possible to have more computation threads than cores.

通过采取较少的线程,我们可以确保始终有一个或多个CPU内核空闲,即使在重载下,计算线程池也不会使服务器饱和。根本不可能出现计算线程多于内核的情况。

9. Schedulers.test

9.Schedulers.test

This Scheduler is used only for testing purposes, and we’ll never see it in production code. Its main advantage is the ability to advance the clock, simulating time passing by arbitrarily:

这个Scheduler仅用于测试目的,我们永远不会在生产代码中看到它。它的主要优点是能够推进时钟,模拟时间的任意流逝。

List<String> letters = Arrays.asList("A", "B", "C");
TestScheduler scheduler = Schedulers.test();
TestSubscriber<String> subscriber = new TestSubscriber<>();

Observable<Long> tick = Observable
  .interval(1, TimeUnit.SECONDS, scheduler);

Observable.from(letters)
  .zipWith(tick, (string, index) -> index + "-" + string)
  .subscribeOn(scheduler)
  .subscribe(subscriber);

subscriber.assertNoValues();
subscriber.assertNotCompleted();

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");

scheduler.advanceTimeTo(3, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(3);
assertThat(
  subscriber.getOnNextEvents(), 
  hasItems("0-A", "1-B", "2-C"));

10. Default Schedulers

10.默认调度器

Some Observable operators in RxJava have alternate forms that allow us to set which Scheduler the operator will use for its operation. Others don’t operate on any particular Scheduler or operate on a particular default Scheduler.

RxJava中的一些Observable操作符有备用的形式,允许我们设置操作符将使用哪个Scheduler来进行操作。还有一些运算符不在任何特定的Scheduler上操作,或者在一个特定的默认Scheduler上操作。

For example, the delay operator takes upstream events and pushes them downstream after a given time. Obviously, it cannot hold the original thread during that period, so it must use a different Scheduler:

例如,delay操作符获取上游事件并在给定的时间后将其推到下游。很明显,它不能在这段时间内保留原始线程,所以它必须使用不同的Scheduler

ExecutorService poolA = newFixedThreadPool(
  10, threadFactory("Sched1-"));
Scheduler schedulerA = Schedulers.from(poolA);
Observable.just('A', 'B')
  .delay(1, TimeUnit.SECONDS, schedulerA)
  .subscribe(i -> result+= Thread.currentThread().getName() + i + " ");

Thread.sleep(2000);
Assert.assertTrue(result.equals("Sched1-A Sched1-B "));

Without supplying a custom schedulerA, all operators below delay would use the computation Scheduler.

如果不提供一个自定义的schedulerA,所有低于delay的操作者将使用computation Scheduler

Other important operators that support custom Schedulers are buffer, interval, range, timer, skip, take, timeout, and several others. If we don’t provide a Scheduler to such operators, computation scheduler is utilized, which is a safe default in most cases.

其他支持自定义Scheduler的重要操作符有buffer, interval, range, timer, skip, take, timeout以及其他几个。如果我们没有为这类操作符提供一个Scheduler,就会利用computationscheduler,这在大多数情况下是一个安全的默认值。

11. Conclusion

11.结论

In truly reactive applications, for which all long-running operations are asynchronous, very few threads and thus Schedulers are needed.

在真正的反应式应用中,所有长期运行的操作都是异步的,很少需要线程,因此Scheduler

Mastering schedulers are essential to writing scalable and safe code using RxJava. The difference between subscribeOn and observeOn is especially important under high load where every task must be executed precisely when we expect.

掌握调度器对于使用RxJava编写可扩展和安全的代码至关重要。subscribeOnobserveOn之间的区别在高负载下尤为重要,因为每个任务都必须在我们预期的时间内精确执行。

Last but not least, we must be sure that Schedulers used downstream can keep up with the load generated by Schedulers upstream. For more information, there is this article about backpressure.

最后但同样重要的是,我们必须确保下游使用的Schedulers能够跟上Schedulers 产生的loupstream。关于更多信息,有这样一篇文章:backpressure

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

所有这些例子和代码片段的实现都可以在GitHub项目中找到–这是一个Maven项目,所以应该很容易导入并按原样运行。