Reactor WebFlux vs Virtual Threads – Reactor WebFlux vs 虚拟线程

最后修改: 2023年 12月 20日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll compare Java 19’s virtual threads with Project Reactor’s Webflux. We’ll begin by revisiting the fundamental workings of each approach, and subsequently, we’ll analyze their strengths and weaknesses.

在本教程中,我们将比较 Java 19 的虚拟线程和 Project Reactor 的Webflux。我们将首先重温每种方法的基本工作原理,然后分析它们的优缺点。

We’ll start by exploring the strengths of reactive frameworks and we’ll see why WebFlux remains valuable. After that, we’ll discuss the thread-per-request approach and highlight scenarios where virtual threads can be a better option.

我们将首先探讨反应式框架的优势,并了解为什么 WebFlux 仍然很有价值。之后,我们将讨论按请求生成线程的方法,并重点介绍虚拟线程在哪些情况下可能是更好的选择。

2. Code Examples

2.代码示例

For the code examples in this article, we’ll assume we’re developing the backend of an e-commerce application. We’ll focus on the function responsible for computing and publishing the price of an item added to a shopping cart:

在本文的代码示例中,我们假设正在开发一个电子商务应用程序的后台。我们将重点讨论负责计算和发布添加到购物车的商品价格的函数:

class ProductService {
    private final String PRODUCT_ADDED_TO_CART_TOPIC = "product-added-to-cart";

    private final ProductRepository repository;
    private final DiscountService discountService;
    private final KafkaTemplate<String, ProductAddedToCartEvent> kafkaTemplate;

    // constructor

    public void addProductToCart(String productId, String cartId) {
        Product product = repository.findById(productId)
          .orElseThrow(() -> new IllegalArgumentException("not found!"));

        Price price = product.basePrice();
        if (product.category().isEligibleForDiscount()) {
            BigDecimal discount = discountService.discountForProduct(productId);
            price.setValue(price.getValue().subtract(discount));
        }

        var event = new ProductAddedToCartEvent(productId, price.getValue(), price.getCurrency(), cartId);
        kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event);
    }

}

As we can see, we begin by retrieving the Product from the MongoDB database using a MongoRepository. Once retrieved, we determine if the Product qualifies for discounts. If this is the case, we use DiscountService to perform an HTTP request to ascertain any available discounts for the product.

正如我们所见,我们首先使用 MongoRepository 从 MongoDB 数据库中检索 Product 。检索完成后,我们将确定 Product 是否符合折扣条件。如果符合,我们将使用 DiscountService 执行 HTTP 请求,以确定该产品是否有任何可用折扣。

Finally, we calculate the final price for the product. Upon completion, we dispatch a Kafka message containing the productId, cartId, and the computed price.

最后,我们计算出产品的最终价格。计算完成后,我们会发送一条 Kafka消息,其中包含 productId, cartId 和计算出的价格。

3. WebFlux

3. WebFlux

WebFlux is a framework for building asynchronous, non-blocking, and event-driven applications. It operates on reactive programming principles, leveraging the Flux and Mono types to handle the intricacies of asynchronous communication. These types implement the publisher-subscriber design pattern, decoupling the consumer and the producer of the data.

WebFlux 是一个用于构建异步、非阻塞和事件驱动应用程序的框架。它根据反应式编程原则运行,利用 FluxMono 类型来处理复杂的异步通信。这些类型实现了publisher-subscriber设计模式,将数据的消费者和生产者解耦。

3.1. Reactive Libraries

3.1.反应式图书馆

Numerous modules from the Spring ecosystem integrate with WebFlux for reactive programming. Let’s use some of these modules while refactoring our code toward a reactive paradigm.

Spring 生态系统中的许多模块都与 WebFlux 集成,以实现反应式编程。让我们在重构代码以实现反应式范例时使用其中的一些模块。

For instance, we can switch the MongoRepository to a ReactiveMongoRepository. This change means we’ll have to work with a Mono<Product> instead of an Optional<Product>:

例如,我们可以将 MongoRepository 替换为 ReactiveMongoRepository 。这一更改意味着我们必须使用 Mono<Product> 而不是 Optional<Product> 来工作:

Mono<Product> product = repository.findById(productId)
  .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")));

Similarly, we can change the ProductService to be asynchronous and non-blocking. For example, we can make it use WebClient for performing the HTTP requests, and, consequently, return the discount as a Mono<BigDecimal>:

同样,我们可以将 ProductService 改为异步和非阻塞。例如,我们可以让它使用 WebClient 来执行 HTTP 请求,从而以 Mono<BigDecimal>: 的形式返回折扣。

Mono<BigDecimal> discount = discountService.discountForProduct(productId);

3.2. Immutability

3.2.不变性

In functional and reactive programming paradigms, immutability is always preferred over mutable data. Our initial method involves altering the Price‘s value using a setter. However, as we move towards a reactive approach, let’s refactor the Price object and make it immutable.

在函数式和反应式编程范例中,不可变性总是比可变数据更受青睐。我们最初的方法是使用设置器更改 Price 的值。但是,当我们转向反应式方法时,让我们重构 Price 对象并使其不可变。

For example, we can introduce a dedicated method that applies the discount and generates a new Price instance rather than modifying the existing one:

例如,我们可以引入一个专门的方法来应用折扣并生成一个新的 Price 实例,而不是修改现有的 Price 实例:

record Price(BigDecimal value, String currency) {  
    public Price applyDiscount(BigDecimal discount) {
        return new Price(value.subtract(discount), currency);
    }
}

Now, we can compute the new price based on the discount, using WebFlux’s map() method:

现在,我们可以使用 WebFlux 的 map() 方法,根据折扣计算新价格:

Mono<Price> price = discountService.discountForProduct(productId)
  .map(discount -> price.applyDiscount(discount));

Additionally, we can even use a method reference here, to keep the code compact:

此外,我们甚至可以在这里使用方法引用,以保持代码的紧凑性:

Mono<Price> price = discountService.discountForProduct(productId).map(price::applyDiscount);

3.3. Functional Pipelines

3.3.功能管道

Mono and Flux adhere to the functor and monad patterns, through methods such as map() and flatMap(). This allows us to describe our use case as a pipeline of transformations on immutable data.

MonoFlux通过map()flatMap()等方法坚持使用函数器和monad模式。这样,我们就可以将用例描述为对不可变数据进行转换的管道。

Let’s try to identify the transformations needed for our use case:

让我们尝试确定我们的用例所需的转换: 让我们尝试确定我们的用例所需的转换:</span

  • we start with a raw productId
  • we transform it into a Product
  • we use the Product to compute a Price
  • we use the Price to create an event
  • finally, we publish the event on a message queue

Now, let’s refactor the code to reflect this chain of functions:

现在,让我们重构代码,以反映这个函数链:

void addProductToCart(String productId, String cartId) {
    Mono<Product> productMono = repository.findById(productId)
      .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")));

    Mono<Price> priceMono = productMono.flatMap(product -> {
        if (product.category().isEligibleForDiscount()) {
            return discountService.discountForProduct(productId)
              .map(product.basePrice()::applyDiscount);
        }
        return Mono.just(product.basePrice());
    });

    Mono<ProductAddedToCartEvent> eventMono = priceMono.map(
      price -> new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId));

    eventMono.subscribe(event -> kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event));
}

Now, let’s inline the local variables to keep the code compact. Additionally, let’s extract a function for computing the price, and use it inside of the flatMap():

现在,让我们内联局部变量,以保持代码紧凑。此外,让我们提取一个用于计算价格的函数,并在 flatMap() 中使用它:

void addProductToCart(String productId, String cartId) {
    repository.findById(productId)
      .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")))
      .flatMap(this::computePrice)
      .map(price -> new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId))
      .subscribe(event -> kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event));
}

Mono<Price> computePrice(Product product) {
    if (product.category().isEligibleForDiscount()) {
        return discountService.discountForProduct(product.id())
          .map(product.basePrice()::applyDiscount);
    }
    return Mono.just(product.basePrice());
}

4. Virtual Threads

4.虚拟线程

Virtual Threads were introduced in Java via Project Loom as an alternative solution for parallel processing. They are lightweight, user-mode threads managed by the Java Virtual Machine (JVM). As a result, they are particularly well suited for I/O operations, where traditional threads may spend significant time waiting for external resources.

虚拟线程是通过Project Loom引入 Java 的,作为并行处理的替代解决方案。它们是由 Java 虚拟机(JVM)管理的轻量级用户模式线程。因此,它们特别适合 I/O 操作,因为传统线程可能会花费大量时间等待外部资源。

In contrast to asynchronous or reactive solutions, virtual threads enable us to keep using the thread-per-request processing model. In other words, we can keep writing code sequentially, without mixing the business logic and the reactive API.

与异步或反应式解决方案相比,虚拟线程能让我们继续使用按请求处理线程的模式。换句话说,我们可以继续按顺序编写代码,而无需将业务逻辑与反应式应用程序接口混合在一起。

4.1. Virtual Threads

4.1.虚拟线程

There are several approaches available to utilize virtual threads for executing our code. For a single method, such as the one demonstrated in the previous example, we can employ startVirtualThread(). This static method was recently added to the Thread API and executes a Runnable on a new virtual thread:

有几种方法可以利用虚拟线程来执行我们的代码。对于单个方法,例如前面示例中演示的方法,我们可以使用 startVirtualThread() 方法。这个静态方法是最近添加到 Thread API 中的,它将在一个新的虚拟线程上执行 Runnable

public void addProductToCart(String productId, String cartId) {
    Thread.startVirtualThread(() -> computePriceAndPublishMessage(productId, cartId));
}

private void computePriceAndPublishMessage(String productId, String cartId) {
    // ...
}

Alternatively, we can create an ExecutorService that relies on virtual threads with the new static factory method Executors.newVirtualThreadPerTaskExecutor(). Furthermore, for applications using Spring Framework 6 and Spring Boot 3, we can leverage the new Executor and configure Spring to favor virtual threads over platform threads.

作为替代,我们可以使用新的静态工厂方法Executors.newVirtualThreadPerTaskExecutor()创建依赖于虚拟线程的ExecutorService 此外,对于使用 Spring Framework 6 和 Spring Boot 3 的应用程序,我们可以利用新的 Executor 配置 Spring 以让虚拟线程优于平台线程。

4.2. Compatibility

4.2.兼容性

Virtual threads simplify code by using a more traditional synchronous programming model. As a result, we can write code in a sequential manner, akin to blocking I/O operations, without worrying about explicit reactive constructs.

虚拟线程通过使用更传统的同步编程模型来简化代码。因此,我们可以以类似于阻塞 I/O 操作的顺序方式编写代码,而不必担心显式反应结构。

Moreover, we can seamlessly switch from regular single-threaded code to virtual threads with minimal to no alterations. For instance, in our previous example, we simply need to create a virtual thread using the static factory method startVirtualThread() and execute logic inside of it:

此外,我们还可以从普通的单线程代码无缝切换到虚拟线程,只需进行极少的改动,甚至不需要改动。例如,在前面的示例中,我们只需使用静态工厂方法 startVirtualThread() 创建一个虚拟线程,并在其中执行逻辑:

void addProductToCart(String productId, String cartId) {
    Thread.startVirtualThread(() -> computePriceAndPublishMessage(productId, cartId));
}

void computePriceAndPublishMessage(String productId, String cartId) {
    Product product = repository.findById(productId)
      .orElseThrow(() -> new IllegalArgumentException("not found!"));

    Price price = computePrice(productId, product);

    var event = new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId);
    kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event);
}

Price computePrice(String productId, Product product) {
    if (product.category().isEligibleForDiscount()) {
        BigDecimal discount = discountService.discountForProduct(productId);
        return product.basePrice().applyDiscount(discount);
    }
    return product.basePrice();
}

4.3. Readability

4.3.可读性

With the thread-per-request processing model, it can be easier to understand and reason about the business logic. This can reduce the cognitive load associated with reactive programming paradigms.

采用线程按请求处理模型,可以更容易地理解和推理业务逻辑。这可以减少与反应式编程范式相关的认知负荷。

In other words, virtual threads allow us to cleanly separate the technical concerns from our business logic. As a result, it eliminates the need for external APIs in implementing our business use cases.

换句话说,虚拟线程允许我们将技术问题与业务逻辑干净利落地分开。因此,在实现业务用例时,我们不再需要外部应用程序接口。

5. Conclusion

5.结论

In this article, we compared two different approaches to concurrency and asynchronous processing. We started by analyzing the project Reactor’s WebFlux and the reactive programming paradigm. We discovered that this approach favors immutable objects and functional pipelines.

在本文中,我们比较了并发和异步处理的两种不同方法。我们首先分析了 Reactor 的 WebFlux 项目和反应式编程范式。我们发现,这种方法倾向于不可变对象和功能管道。

After that, we discussed virtual threads and their exceptional compatibility with legacy codebases that allow for a smooth transition to non-blocking code. Additionally, they have the added benefit of cleanly separating the business logic from the infrastructure code and other technical concerns.

之后,我们讨论了虚拟线程及其与传统代码库的出色兼容性,它们允许向非阻塞代码平稳过渡。此外,虚拟线程还具有将业务逻辑与基础架构代码和其他技术问题干净分离的额外优势。

As usual, all code samples used in this article are available over on GitHub.

与往常一样,本文中使用的所有代码示例均可在 GitHub 上获取