Concurrency in Spring WebFlux – Spring WebFlux的并发性

最后修改: 2020年 8月 18日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

In this tutorial, we’ll explore concurrency in reactive programs written with Spring WebFlux.

在本教程中,我们将探讨使用Spring WebFlux编写的反应式程序的并发性。

We’ll begin by discussing concurrency in relation to reactive programming. Then we’ll learn how Spring WebFlux offers concurrency abstractions over different reactive server libraries.

我们将首先讨论与反应式编程有关的并发性。然后我们将学习Spring WebFlux如何在不同的反应式服务器库中提供并发性抽象。

2. The Motivation for Reactive Programming

2.反应式编程的动因

A typical web application comprises several complex, interacting parts. Many of these interactions are blocking in nature, such as those involving a database call to fetch or update data. Several others, however, are independent and can be performed concurrently, possibly in parallel.

一个典型的Web应用程序包括几个复杂的、相互作用的部分其中许多互动在本质上是阻塞的,例如那些涉及数据库调用以获取或更新数据。然而,其他几个部分是独立的,可以并发执行,可能是并行的。

For instance, two user requests to a web server can be handled by different threads. On a multi-core platform, this has an obvious benefit in terms of the overall response time. Hence, this model of concurrency is known as the thread-per-request model:

例如,两个用户对Web服务器的请求可以由不同的线程来处理。多核平台上,这在整体响应时间方面有明显的好处。因此,这种并发模型被称为每请求线程模型

In the diagram above, each thread handles a single request at a time.

在上图中,每个线程一次处理一个请求。

While thread-based concurrency solves a part of the problem for us, it does nothing to address the fact that most of our interactions within a single thread are still blocking. Moreover, the native threads we use to achieve concurrency in Java come at a significant cost in terms of context switches.

虽然基于线程的并发性为我们解决了一部分问题,但它并没有解决以下事实:我们在单个线程中的大多数交互仍然是阻塞的。此外,我们在Java中用来实现并发的本地线程在上下文切换方面付出了巨大的代价。

Meanwhile, as web applications face more and more requests, the thread-per-request model starts to fall short of expectations.

同时,随着网络应用面临越来越多的请求,每请求线程模型开始达不到预期

Consequently, we need a concurrency model that can help us handle increasingly more requests with a relatively fewer number of threads. This is one of the primary motivations for adopting reactive programing.

因此,我们需要一个能够帮助我们以相对较少的线程数量处理越来越多请求的并发模型这就是采用reactive编程的主要动机之一。

3. Concurrency in Reactive Programming

3.反应式编程的并发性

Reactive programming helps us structure the program in terms of data flows and the propagation of change through them. In a completely non-blocking environment, this can enable us to achieve higher concurrency with better resource utilization.

反应式编程帮助我们从数据流和通过数据流传播变化的角度来构建程序。在一个完全无阻塞的环境中,这可以使我们实现更高的并发性,有更好的资源利用率。

However, is reactive programming a complete departure from thread-based concurrency? While this is a strong statement to make, reactive programming certainly has a very different approach to the usage of threads to achieve concurrency. So the fundamental difference that reactive programming brings on is asynchronicity.

然而,反应式编程是否完全脱离了基于线程的并发性?虽然这种说法很强烈,但反应式编程在使用线程实现并发方面肯定有非常不同的方法。因此,反应式编程带来的根本区别是异步性

In other words, the program flow transforms from a sequence of synchronous operations, into an asynchronous stream of events.

换句话说,程序流从一连串的同步操作转变为一个异步的事件流。

For instance, under the reactive model, a read call to the database doesn’t block the calling thread while data is fetched. The call immediately returns a publisher that others can subscribe to. The subscriber can process the event after it occurs and may even further generate events itself:

例如,在反应式模型下,当数据被获取时,对数据库的读取调用并不会阻塞调用线程。该调用立即返回一个其他人可以订阅的发布者。订阅者可以在事件发生后进行处理,甚至可以自己进一步生成事件。

Above all, reactive programming doesn’t emphasize which thread events should be generated and consumed. Rather, the emphasis is on structuring the program as an asynchronous event stream.

最重要的是,反应式编程并不强调哪些线程事件应该被生成和消费。相反,强调的是将程序构建为一个异步事件流

The publisher and subscriber here don’t need to be part of the same thread. This helps us to achieve better utilization of the available threads, and therefore, higher overall concurrency.

这里的发布者和订阅者不需要是同一个线程的一部分。这有助于我们更好地利用可用的线程,从而实现更高的整体并发性。

4. Event Loop

4.事件循环

There are several programming models that describe a reactive approach to concurrency.

有几种编程模型描述了对并发性的反应式方法

In this section, we’ll examine a few of them to understand how reactive programming achieves higher concurrency with fewer threads.

在本节中,我们将研究其中的几个,以了解反应式编程如何以较少的线程实现更高的并发性。

One such reactive asynchronous programming model for servers is the event loop model:

服务器的这种反应式异步编程模型是事件循环 模型

Above is an abstract design of an event loop that presents the ideas of reactive asynchronous programming:

以上是一个事件循环的抽象设计,展示了反应式异步编程的思想。

  • The event loop runs continuously in a single thread, although we can have as many event loops as the number of available cores.
  • The event loop processes the events from an event queue sequentially and returns immediately after registering the callback with the platform.
  • The platform can trigger the completion of an operation, like a database call or an external service invocation.
  • The event loop can trigger the callback on the operation completion notification and send back the result to the original caller.

The event loop model is implemented in a number of platforms, including Node.js, Netty, and Ngnix. They offer much better scalability than traditional platforms, like Apache HTTP Server, Tomcat, or JBoss.

事件循环 模型在许多平台上实现,包括Node.jsNetty,以及Ngnix。它们提供了比传统平台更好的可扩展性,如Apache HTTP ServerTomcat,或JBoss

5. Reactive Programming With Spring WebFlux

5.用Spring WebFlux进行反应式编程

Now we have enough insight into reactive programming and its concurrency model to explore the subject in Spring WebFlux.

现在我们对反应式编程及其并发模型有了足够的了解,可以在Spring WebFlux中探索这一主题。

WebFlux is Spring‘s reactive-stack web framework, which was added in version 5.0.

WebFlux是 Springreactive-stack web框架,它是在5.0版本中加入的。

Let’s explore the server-side stack of Spring WebFlux to understand how it complements the traditional web stack in Spring:

让我们来探索Spring WebFlux的服务器端栈,以了解它是如何补充Spring中的传统Web栈的。

As we can see, Spring WebFlux sits parallel to the traditional web framework in Spring, and doesn’t necessarily replace it.

我们可以看到,Spring WebFlux与Spring中的传统Web框架平行,而不一定要取代它

There are a few important points to note here:

这里有几个重要的点需要注意。

  • Spring WebFlux extends the traditional annotation-based programming model with functional routing.
  • Moreover, it adapts the underlying HTTP runtimes to the Reactive Streams API, making the runtimes interoperable.
  • It’s able to support a wide variety of reactive runtimes, including Servlet 3.1+ containers like Tomcat, Reactor, Netty, or Undertow.
  • Lastly, it includes WebClient, a reactive and non-blocking client for HTTP requests offering functional and fluent APIs.

6. Threading Model in Supported Runtimes

6.支持的运行时中的线程模型

As we discussed earlier, reactive programs tend to work with just a few threads and make the most of them. However, the number and nature of threads depends upon the actual Reactive Stream API runtime that we choose.

正如我们前面所讨论的,反应式程序往往只用几个线程工作,并充分利用它们。然而,线程的数量和性质取决于我们选择的实际反应式流API运行时。

To clarify, Spring WebFlux can adapt to different runtimes through a common API provided by HttpHandler. This API is a simple contract with just one method that provides an abstraction over different server APIs, like Reactor Netty, Servlet 3.1 API, or Undertow APIs.

澄清一下,Spring WebFlux可以通过HttpHandler提供的通用API来适应不同的运行时间。这个API是一个简单的契约,只有一个方法,它提供了对不同服务器API的抽象,如Reactor Netty、Servlet 3.1 API或Undertow API。

Let’s examine the threading model implemented in a few of them.

让我们来看看其中几个实现的线程模型。

While Netty is the default server in a WebFlux application, it’s just a matter of declaring the right dependency to switch to any other supported server:

虽然Netty是WebFlux应用程序中的默认服务器,但只需声明正确的依赖关系就可以切换到任何其他支持的服务器

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-reactor-netty</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

While it’s possible to observe the threads created in a Java Virtual Machine in a number of ways, it’s quite easy to just pull them from the Thread class itself:

虽然可以通过多种方式观察在Java虚拟机中创建的线程,但直接从Thread类本身提取它们还是很容易的。

Thread.getAllStackTraces()
  .keySet()
  .stream()
  .collect(Collectors.toList());

6.1. Reactor Netty

6.1.Reactor Netty

As we said, Reactor Netty is the default embedded server in the Spring Boot WebFlux starter. Let’s see the threads that Netty creates by default. To begin, we won’t add any other dependencies or use WebClient. So if we start a Spring WebFlux application created using its SpringBoot starter, we can expect to see some default threads it creates:

正如我们所说,Reactor Netty是Spring Boot WebFlux启动器中的默认嵌入式服务器。让我们看看Netty默认创建的线程。首先,我们不会添加任何其他依赖项或使用WebClient。因此,如果我们启动一个使用其SpringBoot启动器创建的Spring WebFlux应用程序,我们可以期待看到它创建的一些默认线程。

Note that, apart from a normal thread for the server, Netty spawns a bunch of worker threads for request processing. These are typically available CPU cores. This is the output on a quad-core machine. We’d also see a bunch of housekeeping threads typical to a JVM environment, but they aren’t important here.

请注意,除了一个正常的服务器线程外,Netty还产生了一堆工作线程来处理请求这些通常是可用的CPU核心。这是在四核机器上的输出。我们也会看到一堆JVM环境中典型的管家线程,但它们在这里并不重要。

Netty uses the event loop model to provide highly scalable concurrency in a reactive asynchronous manner. Let’s see how Netty implements an event loop levering Java NIO to provide this scalability:

Netty使用事件循环模型,以反应式异步方式提供高度可扩展的并发性。让我们看看Netty如何实现事件循环利用Java NIO来提供这种可扩展性

Here, EventLoopGroup manages one or more EventLoop, which must be continuously running. Therefore, it isn’t recommended to create more EventLoops than the number of available cores.

在这里,EventLoopGroup管理一个或多个EventLoop,它们必须持续运行。因此,不建议创建超过可用内核数量的EventLoop

The EventLoopGroup further assigns an EventLoop to each newly created Channel. Thus, for the lifetime of a Channel, all operations are executed by the same thread.

EventLoopGroup进一步为每个新创建的Channel分配一个EventLoop。因此,在通道的生命周期内,所有操作都由同一个线程执行。

6.2. Apache Tomcat

6.2.Apache Tomcat

Spring WebFlux is also supported on a traditional Servlet Container, like Apache Tomcat.

Spring WebFlux也支持传统的Servlet容器,如Apache Tomcat

WebFlux relies on the Servlet 3.1 API with non-blocking I/O. While it uses Servlet API behind a low-level adapter, Servlet API isn’t available for direct usage.

WebFlux依赖于Servlet 3.1 API的非阻塞I/O。虽然它在一个低级别的适配器后面使用Servlet API,但Servlet API并不能直接使用。

Let’s see what kind of threads we expect in a WebFlux application running on Tomcat:

让我们看看在Tomcat上运行的WebFlux应用程序中我们期待什么样的线程。

The number and type of threads which we can see here are quite different from what we observed earlier.

我们在这里可以看到的线程的数量和类型与我们之前观察到的有很大不同。

To begin with, Tomcat starts with more worker threads, which defaults to ten. Of course, we’ll also see some housekeeping threads typical to the JVM, and the Catalina container, which we can ignore for this discussion.

首先,Tomcat开始时有更多的工作线程,默认为十个。当然,我们也会看到一些JVM的典型内务线程,以及Catalina容器,我们在讨论中可以忽略这些线程。

We need to understand the architecture of Tomcat with Java NIO to correlate it with the threads we see above.

我们需要了解Tomcat与Java NIO的架构,以便将其与我们上面看到的线程联系起来。

Tomcat 5 and onward supports NIO in its Connector component, which is primarily responsible for receiving the requests.

Tomcat 5及以后的版本在其连接器组件中支持NIO,该组件主要负责接收请求

The other Tomcat component is the Container component, which is responsible for the container management functions.

另一个Tomcat组件是容器组件,它负责容器管理功能。

The point of interest for us here is the threading model that the Connector component implements to support NIO. It’s comprised of Acceptor, Poller, and Worker as part of the NioEndpoint module:

这里我们感兴趣的是连接器组件实现的线程模型,以支持NIO。它由AcceptorPollerWorker组成,是NioEndpoint模块的一部分。

Tomcat spawns one or more threads for Acceptor, Poller, and Worker, typically with a thread pool dedicated to Worker.

Tomcat为AcceptorPollerWorker生成一个或多个线程,通常有一个线程池专门用于Worker

While a detailed discussion on Tomcat architecture is beyond the scope of this article, we should now have enough insight to understand the threads we saw earlier.

虽然对Tomcat架构的详细讨论超出了本文的范围,但我们现在应该有足够的洞察力来理解我们之前看到的线程。

7. Threading Model in WebClient

7.WebClient中的线程模型

WebClient is the reactive HTTP client that’s part of Spring WebFlux. We can use it anytime we require REST-based communication, which enables us to create applications that are end-to-end reactive.

WebClient Spring WebFlux中的反应式HTTP客户端。我们可以在需要基于REST的通信时使用它,这使我们能够创建端到端 反应式的应用程序。

As we’ve seen before, reactive applications work with just a few threads, so there’s no margin for any part of the application to block a thread. Therefore, WebClient plays a vital role in helping us realize the potential of WebFlux.

正如我们之前所看到的,反应式应用程序只用几个线程来工作,所以应用程序的任何部分都没有余地来阻塞线程。因此,WebClient在帮助我们实现WebFlux的潜力方面发挥了重要作用。

7.1. Using WebClient

7.1.使用WebClient

Using WebClient is quite simple as well. We don’t need to include any specific dependencies, as it’s part of Spring WebFlux.

使用WebClient也很简单。我们不需要包含任何特定的依赖项,因为它是Spring WebFlux的一部分

Let’s create a simple REST endpoint that returns a Mono:

让我们创建一个简单的REST端点,返回一个Mono

@GetMapping("/index")
public Mono<String> getIndex() {
    return Mono.just("Hello World!");
}

Then we’ll use WebClient to call this REST endpoint and consume the data reactively:

然后我们将使用WebClient来调用这个REST端点,并反应性地消费数据。

WebClient.create("http://localhost:8080/index").get()
  .retrieve()
  .bodyToMono(String.class)
  .doOnNext(s -> printThreads());

Here we’re also printing the threads that are created using the method we discussed earlier.

在这里,我们也在打印使用我们前面讨论的方法创建的线程。

7.2. Understanding the Threading Model

7.2.了解线程模型

So, how does the threading model work in the case of WebClient?

那么,在WebClient的情况下,线程模型是如何工作的?

Well, not surprisingly, WebClient also implements concurrency using the event loop model. Of course, it relies on the underlying runtime to provide the necessary infrastructure.

那么,毫不奇怪,WebClient也使用事件循环模型实现了并发。当然,它依赖于底层运行时来提供必要的基础设施。

If we’re running WebClient on the Reactor Netty, it shares the event loop that Netty uses for the server. Therefore, in this case, we may not notice much difference in the threads that are created.

如果我们在Reactor Netty上运行WebClient,它共享Netty用于服务器的事件循环。因此,在这种情况下,我们可能不会注意到所创建的线程有什么不同。

However, WebClient is also supported on a Servlet 3.1+ container, like Jetty, but the way it works there is different.

然而,WebClient也支持Servlet 3.1+容器,如Jetty,但其工作方式不同

If we compare the threads that are created on a WebFlux application running Jetty with and without WebClient, we’ll notice a few additional threads.

如果我们比较运行Jetty的WebFlux应用程序上创建的线程和没有WebClient的线程,我们会发现有一些额外的线程。

Here, WebClient has to create its event loop. So we can see the fixed number of processing threads that this event loop creates:

在这里,WebClient必须创建其事件循环。因此,我们可以看到这个事件循环所创建的处理线程的固定数量。

In some cases, having a separate thread pool for client and server can provide better performance. While it’s not the default behavior with Netty, it’s always possible to declare a dedicated thread pool for WebClient if needed.

在某些情况下,为客户端和服务器设置一个单独的线程池可以提供更好的性能。虽然这不是Netty的默认行为,但如果需要,总是可以为WebClient声明一个专门的线程池。

We’ll see how this is possible in a later section.

我们将在后面的章节中看到这一点是如何实现的。

8. Threading Model in Data Access Libraries

8.数据访问库的线程模型

As we saw earlier, even a simple application usually consists of several parts that need to be connected.

正如我们前面所看到的,即使是一个简单的应用程序通常由几个需要连接的部分组成。

Typical examples of these parts include databases and message brokers. The existing libraries to connect with many of them are still blocking, but that’s quickly changing.

这些部分的典型例子包括数据库和消息代理。与其中许多部分连接的现有库仍然是阻塞的,但这种情况正在迅速改变。

There are several databases now that offer reactive libraries for connectivity. Many of these libraries are available within Spring Data, while we can use others directly as well.

现在有几个数据库提供用于连接的反应式库其中许多库在Spring Data中可用,而我们也可以直接使用其他库。

The threading model these libraries use is of particular interest to us.

这些库使用的线程模型是我们特别感兴趣的。

8.1. Spring Data MongoDB

8.1.Spring数据 MongoDB

Spring Data MongoDB provides reactive repository support for MongoDB built on top of the MongoDB Reactive Streams driver. Most notably, this driver fully implements the Reactive Streams API to provide asynchronous stream processing with non-blocking back-pressure.

Spring Data MongoDB为建立在MongoDB Reactive Streams 驱动程序之上的 MongoDB 提供反应性存储库支持。最值得注意的是,该驱动程序完全实现了 Reactive Streams API,以提供异步流处理非阻塞反压

Setting up support for the reactive repository for MongoDB in a Spring Boot application is as simple as adding a dependency:

在Spring Boot应用程序中设置对MongoDB的反应式资源库的支持,就像添加一个依赖关系一样简单。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

This will allow us to create a repository, and use it to perform some basic operations on MongoDB in a non-blocking manner:

这将使我们能够创建一个存储库,并使用它以非阻塞的方式对MongoDB进行一些基本操作。

public interface PersonRepository extends ReactiveMongoRepository<Person, ObjectId> {
}
.....
personRepository.findAll().doOnComplete(this::printThreads);

So what kind of threads can we expect to see when we run this application on the Netty server?

那么,当我们在Netty服务器上运行这个应用程序时,我们可以期待看到什么样的线程呢?

Well, not surprisingly, we won’t see much difference, as a Spring Data reactive repository makes use of the same event loop that’s available for the server.

好吧,毫不奇怪,我们不会看到太大的区别,因为aSpring Data反应式存储库利用了服务器可用的相同事件循环。

8.2. Reactor Kafka

8.2.Reactor Kafka

Spring is still in the process of building full-fledged support for reactive Kafka. However, we do have options available outside Spring.

Spring仍在建立对反应式Kafka的全面支持。然而,我们确实有Spring之外的选择。

Reactor Kafka is a reactive API for Kafka based on Reactor. Reactor Kafka enables messages to be published and consumed using functional APIs, also with non-blocking back-pressure.

Reactor Kafka是一个基于Reactor的Kafka的反应式API。Reactor Kafka使消息能够使用功能性API进行发布和消费,同时还具有非阻塞性反压

First, we need to add the required dependency in our application to start using Reactor Kafka:

首先,我们需要在我们的应用程序中添加所需的依赖性,以开始使用Reactor Kafka。

<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
    <version>1.3.10</version>
</dependency>

This should enable us to produce messages to Kafka in a non-blocking manner:

这应该能让我们以非阻塞的方式向Kafka产生消息。

// producerProps: Map of Standard Kafka Producer Configurations
SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProps);
KafkaSender<Integer, String> sender =  KafkaSender.create(senderOptions);
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = Flux
  .range(1, 10)
  .map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i));
sender.send(outboundFlux).subscribe();

Similarly, we should be able to consume messages from Kafka, also in a non-blocking manner:

同样地,我们应该能够从Kafka中消费消息,而且是以非阻塞的方式。

// consumerProps: Map of Standard Kafka Consumer Configurations
ReceiverOptions<Integer, String> receiverOptions = ReceiverOptions.create(consumerProps);
receiverOptions.subscription(Collections.singleton("reactive-test"));
KafkaReceiver<Integer, String> receiver = KafkaReceiver.create(receiverOptions);
Flux<ReceiverRecord<Integer, String>> inboundFlux = receiver.receive();
inboundFlux.doOnComplete(this::printThreads)

This is pretty simple and self-explanatory.

这很简单,不言自明。

We’re subscribing to a topic reactive-test in Kafka, and getting a Flux of messages.

我们在Kafka中订阅了一个主题reactive-test,并得到了一个Flux的消息。

The interesting thing for us is the threads that get created:

对我们来说,有意义的事情是被创建的线程

We can see a few threads that aren’t typical to the Netty server.

我们可以看到一些不是Netty服务器的典型线程

This indicates that Reactor Kafka manages its own thread pool, with a few worker threads that participate in Kafka message processing exclusively. Of course, we’ll see a bunch of other threads related to Netty and the JVM that we can ignore.

这表明Reactor Kafka管理着自己的线程池,有几个工作线程专门参与Kafka消息处理。当然,我们会看到一堆与Netty和JVM有关的其他线程,我们可以忽略不计。

Kafka producers use a separate network thread for sending requests to the broker. Furthermore, they deliver responses to the application on a single-threaded pooled scheduler.

Kafka生产者使用单独的网络线程向代理发送请求。此外,他们在单线程池化调度器上向应用程序交付响应。

Kafka consumer, on the other hand, has one thread per consumer group that blocks to listen for incoming messages. The incoming messages are then scheduled for processing on a different thread pool.

另一方面,Kafka消费者有一个线程,每个消费者组有一个线程,用来监听传入的消息。然后,传入的消息被安排在不同的线程池中进行处理。

9. Scheduling Options in WebFlux

9.WebFlux中的调度选项

So far, we’ve seen that reactive programming really shines in a completely non-blocking environment with just a few threads. But this also means that, if there is indeed a part that is blocking, it will result in far worse performance. This is because a blocking operation can freeze the event loop entirely.

到目前为止,我们已经看到,反应式编程在只有几个线程的完全非阻塞环境中确实大放异彩。但这也意味着,如果确实有一个部分是阻塞的,它将导致性能大大降低。这是因为一个阻塞操作可以完全冻结事件循环。

So, how do we handle long-running processes or blocking operations in reactive programming?

那么,我们如何在反应式编程中处理长期运行的进程或阻塞操作?

Honestly, the best option would be to just avoid them. However, this may not always be possible, and we may need a dedicated scheduling strategy for those parts of our application.

老实说,最好的选择是直接避免它们。然而,这可能并不总是可能的,我们可能需要为我们应用程序的这些部分制定一个专门的调度策略

Spring WebFlux offers a mechanism to switch processing to a different thread pool in between a data flow chain. This can provide us with precise control over the scheduling strategy that we want for certain tasks. Of course, WebFlux is able to offer this based on the thread pool abstractions, known as schedulers, available in the underlying reactive libraries.

Spring WebFlux 提供了一种机制,可以在数据流链之间将处理切换到不同的线程池。这可以为我们提供对某些任务所需的调度策略的精确控制。当然,WebFlux能够提供这种功能是基于底层反应式库中的线程池抽象,也就是所谓的调度器。

9.1. Reactor

9.1.反应器

In Reactor, the Scheduler class defines the execution model, as well as where the execution takes place.

Reactor中,Scheduler类定义了执行模型,以及执行发生的位置

The Schedulers class provides a number of execution contexts, like immediate, single, elastic, and parallel. These provide different types of thread pools, which can be useful for different jobs. Moreover, we can always create our own Scheduler with a preexisting ExecutorService.

Schedulers类提供了许多执行上下文,如immediatesingleelasticparallel。这些都提供了不同类型的线程池,这对不同的工作都很有用。此外,我们总是可以用预先存在的Scheduler创建我们自己的ExecutorService

While Schedulers give us several execution contexts, Reactor also provides us with different ways to switch the execution context. These are the methods publishOn and subscribeOn.

虽然调度器为我们提供了几种执行上下文,但Reactor也为我们提供了切换执行上下文的不同方法。这些方法是publishOnsubscribeOn

We can use publishOn with a Scheduler anywhere in the chain, with that Scheduler affecting all the subsequent operators.

我们可以在链中的任何地方使用publishOnScheduler,该Scheduler影响所有后续的操作者。

While we can also use subscribeOn with a Scheduler anywhere in the chain, it will only affect the context of the source of emission.

虽然我们也可以在链中的任何地方使用subscribeOn Scheduler ,但它将只影响排放源的上下文。

If we recall, WebClient on Netty shares the same event loop created for the server as a default behavior. However, we may have valid reasons to create a dedicated thread pool for WebClient.

如果我们记得,Netty上的WebClient共享为服务器创建的相同的事件循环,这是一种默认行为。然而,我们可能有合理的理由为WebClient创建一个专门的线程池。

Let’s see how we can achieve this in Reactor, which is the default reactive library in WebFlux:

让我们看看如何在Reactor中实现这一点,它是WebFlux的默认反应式库。

Scheduler scheduler = Schedulers.newBoundedElastic(5, 10, "MyThreadGroup");

WebClient.create("http://localhost:8080/index").get()
  .retrieve()
  .bodyToMono(String.class)
  .publishOn(scheduler)
  .doOnNext(s -> printThreads());

Earlier, we didn’t observe any difference in the threads created on Netty with or without WebClient. However, if we now run the code above, we’ll observe a few new threads being created:

早些时候,我们没有观察到有无WebClient在Netty上创建的线程有什么不同。然而,如果我们现在运行上面的代码,我们将观察到一些新的线程被创建

Here we can see the threads created as part of our bounded elastic thread pool. This is where responses from the WebClient are published once subscribed.

这里我们可以看到作为我们的有界弹性线程池的一部分而创建的线程。一旦订阅,来自WebClient的响应就会在这里发布。

This leaves the main thread pool for handling the server requests.

这就留下了处理服务器请求的主线程池。

9.2. RxJava

9.2 RxJava

The default behavior in RxJava isn’t very different than that of the Reactor.

RxJava中的默认行为与Reactor中的行为并无太大区别

The Observable, and the chain of operators we apply on it, do their work and notify the observers on the same thread where the subscription was invoked. Also, RxJava, like Reactor, offers ways to introduce prefixed or custom scheduling strategies into the chain.

Observable,以及我们在其上应用的操作链,在调用订阅的同一线程上完成其工作并通知观察员。此外,RxJava和Reactor一样,提供了将前缀或自定义调度策略引入链中的方法。

RxJava also features a class Schedulers, which offers a number of execution models for the Observable chain. These include new thread, immediate, trampoline, io, computation, and test. Of course, it also allows us to define a Scheduler from a Java Executor.

RxJava还具有一个Schedulers类,它为Observable链提供了许多执行模型。其中包括新线程即时蹦床io计算测试。当然,它也允许我们从Java Scheduler中定义执行器

Moreover, RxJava also offers two extension methods to achieve this, subscribeOn and observeOn.

此外,RxJava还提供了两个扩展方法来实现这一点subscribeOnobserveOn

The subscribeOn method changes the default behavior by specifying a different Scheduler on which Observable should operate. The observeOn method, on the other hand, specifies a different Scheduler that the Observable can use to send notifications to the observers.

subscribeOn方法通过指定不同的Scheduler来改变默认行为,Observable应该在此操作。另一方面,observeOn方法指定了一个不同的Scheduler,Observable可以使用它来向观察者发送通知。

As we discussed before, Spring WebFlux uses Reactor as its reactive library by default. But since it’s fully compatible with Reactive Streams API, it’s possible to switch to another Reactive Streams implementation, like RxJava (for RxJava 1.x with its Reactive Streams adapter).

正如我们之前讨论的,Spring WebFlux默认使用Reactor作为其反应式库。但由于它与Reactive Streams API完全兼容,所以可以切换到另一个Reactive Streams实现,如RxJava(针对RxJava 1.x及其Reactive Streams适配器)。

We need to explicitly add the dependency:

我们需要明确地添加依赖关系。

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.21</version>
</dependency>

Then we can start to use RxJava types, like Observable, in our application, along with RxJava specific Schedulers:

然后我们就可以开始在我们的应用程序中使用RxJava类型,如Observable,以及RxJava特定的Schedulers

io.reactivex.Observable
  .fromIterable(Arrays.asList("Tom", "Sawyer"))
  .map(s -> s.toUpperCase())
  .observeOn(io.reactivex.schedulers.Schedulers.trampoline())
  .doOnComplete(this::printThreads);

As a result, if we run this application, apart from the regular Netty and JVM related threads, we should see a few threads related to our RxJava Scheduler:

因此,如果我们运行这个应用程序,除了常规的Netty和JVM相关线程外,我们应该看到一些与我们的RxJava Scheduler相关的线程。

10. Conclusion

10.结语

In this article, we explored the premise of reactive programming from the context of concurrency. We observed the difference in the concurrency model in traditional and reactive programming. This allowed us to examine the concurrency model in Spring WebFlux, and its take on the threading model to achieve it.

在这篇文章中,我们从并发性的角度探讨了反应式编程的前提。我们观察了传统编程和反应式编程中并发模型的差异。这使我们能够研究Spring WebFlux中的并发模型,以及它采取的线程模型来实现它。

Then we explored the threading model in WebFlux in combination with different HTTP runtime and reactive libraries. We also learned how the threading model differs when we use WebClient versus a data access library.

然后我们探索了WebFlux中的线程模型与不同的HTTP运行时和反应式库的结合。我们还了解了当我们使用WebClient与数据访问库时,线程模型有什么不同。

Finally, we touched upon the options for controlling the scheduling strategy in our reactive program within WebFlux.

最后,我们谈到了在WebFlux中控制我们的反应式程序的调度策略的选项。

As always, the source code for this article can be found over on GitHub.

像往常一样,本文的源代码可以在GitHub上找到over