Using zipWhen() With Mono – 在 Mono 中使用 zipWhen()

最后修改: 2023年 10月 17日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll explore how we can use zipWhen() to combine the results of two or more Mono streams in a coordinated manner. We’ll start with a quick overview. Next, we’ll set up a simple example involving user data storage and email. We’ll show how zipWhen() enables us to orchestrate and coordinate multiple asynchronous operations in scenarios where we need to gather and process data from various sources concurrently.

在本教程中,我们将探讨如何使用 zipWhen() 以协调的方式合并两个或多个 Mono 流的结果。我们将从快速概述开始。接下来,我们将建立一个涉及用户数据存储和电子邮件的简单示例。我们将展示 zipWhen() 如何使我们能够在需要并发收集和处理来自不同来源的数据的场景中协调多个异步操作。

2. What Is zipWhen()?

2.什么是 zipWhen()?

In Reactive Programming with Mono, zipWhen() is an operator that allows us to combine the results of two or more Mono streams in a coordinated manner. It’s commonly used when we have multiple asynchronous operations to be performed concurrently, and we need to combine their results into a single output. 

在使用 MonoReactive Programming 中,zipWhen()是一个操作符,它允许我们以协调的方式合并两个或多个 Mono 流的结果。当我们需要同时执行多个异步操作,并将其结果合并为单一输出时,通常会使用该操作。

We start with two or more Mono streams that represent asynchronous operations. These Monos can emit different types of data, and they may or may not have dependencies on each other.

我们从代表异步操作的两个或多个 Mono 流开始。这些 Monos 可以发出不同类型的数据,它们之间可能有依赖关系,也可能没有。

We then use zipWhen() to coordinate. We apply the zipWhen() operator to one of the Monos. This operator waits for the first Mono to emit a value and then uses that value to trigger the execution of other Monos. The result of zipWhen() is a new Mono that combines the results of all the Monos into a single data structure, typically a Tuple or an object that we define.

然后,我们使用 zipWhen() 进行协调。我们将 zipWhen() 操作符应用到其中一个 Monos 上。该操作符会等待第一个 Mono 发送一个值,然后使用该值触发其他 Monos 的执行。 zipWhen() 的结果是一个新的 Mono ,它将所有 Monos 的结果合并为一个数据结构,通常是 Tuple 或我们定义的对象。

Finally, we can specify how we want to combine the results of the Monos. We can use the combined values to create a new object, perform calculations, or construct a meaningful response.

最后,我们可以指定如何组合 Monos 的结果。我们可以使用组合值创建新对象、执行计算或构建有意义的响应。

3. Example Setup

3.设置示例

Let’s set up a simple example consisting of three simplified services: UserService, EmailService, and DataBaseService. Each one of them produces data in the form of a Mono of different types. We want to combine all of the data in a single response and return it to the calling client. Let’s set up the appropriate POM dependencies first.

让我们建立一个由三个简化服务组成的简单示例:用户服务电子邮件服务数据库服务。每个服务都以不同类型的 Mono 形式生成数据。我们希望将所有数据合并到一个响应中,并将其返回给调用客户端。让我们先设置适当的 POM 依赖项。

3.1. Dependencies

3.1 依赖性

Let’s set up the required dependencies first. We’ll require spring-boot-starter-webflux and reactor-test:

让我们先设置所需的依赖项。我们将需要 spring-boot-starter-webfluxreactor-test

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

3.2. Setting up UserService

3.2.设置用户服务</em

Let’s start by introducing the User Service:

让我们先来介绍一下用户服务:

public class UserService {
    public Mono<User> getUser(String userId) {
        return Mono.just(new User(userId, "John Stewart"));
    }
}

Here, the UserService provides a method to retrieve user data based on a given userId. It returns a Mono<User> representing user information.

在这里,UserService 提供了一种根据给定的 userId 检索用户数据的方法。它返回一个代表用户信息的Mono<User>

3.3. Setting up EmailService

3.3.设置电子邮件服务</em

Next, let’s add the EmailService:

接下来,让我们添加 EmailService

public class EmailService {
    private final UserService userService;

    public EmailService(UserService userService) {
        this.userService = userService;
    }

    public Mono<Boolean> sendEmail(String userId) {
        return userService.getUser(userId)
          .flatMap(user -> {
              System.out.println("Sending email to: " + user.getEmail());
              return Mono.just(true);
          })
          .defaultIfEmpty(false);
    }
}

As the name suggests, the EmailService is responsible for sending emails to users. Importantly, it depends on the UserService to fetch user details and then send an email based on the retrieved information. The sendEmail() method returns a Mono<Boolean> indicating whether the email was sent successfully.

顾名思义,EmailService 负责向用户发送电子邮件。重要的是,它依赖于 UserService 来获取用户详细信息,然后根据获取的信息发送电子邮件sendEmail() 方法返回一个 Mono<Boolean> 表示电子邮件是否发送成功。

3.4. Setting up DatabaseService

3.4.设置数据库服务</em

public class DatabaseService {
    private Map<String, User> dataStore = new ConcurrentHashMap<>();

    public Mono<Boolean> saveUserData(User user) {
        return Mono.create(sink -> {
            try {
                dataStore.put(user.getId(), user);
                sink.success(true);
            } catch (Exception e) {
                sink.success(false);
            }
        });
    }
}

The DatabaseService handles the persistence of user data to a database. We are using a  concurrent map to represent data storage here for simplicity.

DatabaseService 处理将用户数据持久化到数据库的过程。为简单起见,我们在此使用并发映射来表示数据存储。

It provides a saveUserData() method that takes user information and returns a Mono<Boolean> to signify the success or failure of the database operation.

它提供了一个 saveUserData() 方法,该方法获取用户信息并返回一个 Mono<Boolean> 来表示数据库操作的成功或失败。

4. zipWhen() in Action

4.zipWhen()实际操作

Now that we have all our services defined, let’s define a controller method that combines the Mono streams from all three services into one response of type Mono<ResponseEntity<String>>. We’ll show how we can use the zipWhen() operator to coordinate various asynchronous operations and convert them all to a single response for the calling client. Let’s define the GET method first:

现在我们已定义了所有服务,让我们定义一个控制器方法,该方法将来自所有三个服务的 Mono 流合并为一个 Mono<ResponseEntity<String>> 类型的响应。我们将展示如何使用 zipWhen() 操作符来协调各种异步操作,并将它们全部转换为供调用客户端使用的单一响应。让我们先定义 GET 方法:

@GetMapping("/example/{userId}")
public Mono<ResponseEntity<String>> combineAllDataFor(@PathVariable String userId) {
    Mono<User> userMono = userService.getUser(userId);
    Mono<Boolean> emailSentMono = emailService.sendEmail(userId)
      .subscribeOn(Schedulers.parallel());
    Mono<String> databaseResultMono = userMono.flatMap(user -> databaseService.saveUserData(user)
      .map(Object::toString));

    return userMono.zipWhen(user -> emailSentMono, (t1, t2) -> Tuples.of(t1, t2))
      .zipWhen(tuple -> databaseResultMono, (tuple, databaseResult) -> {
          User user = tuple.getT1();
          Boolean emailSent = tuple.getT2();
          return ResponseEntity.ok()
            .body("Response: " + user + ", Email Sent: " + emailSent + ", Database Result: " + databaseResult);
      });
}

When a client calls the GET /example/{userId} endpoint, the userService invokes the combineAllData() method to retrieve information about a user based on the provided userId by calling userService.getUser(userId). This result is stored in Mono<User> called userMono here.

当客户端调用 GET /example/{userId} 端点时,userService 会调用combineAllData()方法,通过调用 userService.getUser(userId) 来根据提供的 userId 检索用户信息。该结果存储在 Mono<User> 中,此处称为 userMono。

Next, it sends an email to the same user. However, before sending the email, it checks whether the user exists. The result of the email-sending operation (success or failure) is represented by the emailSentMono of type Mono<Boolean>. This operation executes in parallel to save time. It saves the user data (retrieved in step 1) to a database using the databaseService.saveUserData(user). The result of this operation (success or failure) is converted to a string and stored in the Mono<String>.

接下来,它会向同一个用户发送电子邮件。不过,在发送电子邮件之前,它会检查该用户是否存在。电子邮件发送操作的结果(成功或失败)由 Mono<Boolean> 类型的 emailSentMono 表示。该操作并行执行,以节省时间。它使用 databaseService.saveUserData(user) 将用户数据(在步骤 1 中检索)保存到数据库中。此操作的结果(成功或失败)被转换为字符串并存储在 Mono<String> 中。

Importantly, it uses the zipWhen() operator to combine the results from the previous steps. The first zipWhen() combines the user data userMono and the email sending status from emailSentMono into a tuple. The second zipWhen() combines the previous tuple and the database result from dataBaseResultMono to construct a final response. Inside the second zipWhen(), it constructs a response message using the combined data.

重要的是,它使用zipWhen()操作符来合并前几个步骤的结果。第一个 zipWhen() 将用户数据 userMono 和来自 emailSentMono 的电子邮件发送状态合并为一个元组。第二个 zipWhen() 将前一个元组和来自 dataBaseResultMono 的数据库结果组合在一起,以构建最终响应。在第二个 zipWhen() 中,它会使用组合数据构建响应消息。

The message includes user information, whether the email was successfully sent, and the database operation’s result. In essence, this method orchestrates the retrieval of user data, email sending, and database operations for a specific user and combines the results into a meaningful response, ensuring that everything happens efficiently and concurrently.

信息包括用户信息、电子邮件是否发送成功以及数据库操作的结果。从本质上讲,这种方法可以协调特定用户的用户数据检索、电子邮件发送和数据库操作,并将结果合并为一个有意义的响应,确保所有操作高效并发地进行。

5. Testing

5.测试

Now, let’s put our system under test and verify that the correct response is returned that combines three different types of Reactive Streams:

现在,让我们对系统进行测试,验证是否能返回结合了三种不同类型的反应流的正确响应:

@Test
public void givenUserId_whenCombineAllData_thenReturnsMonoWithCombinedData() {
    UserService userService = Mockito.mock(UserService.class);
    EmailService emailService = Mockito.mock(EmailService.class);
    DatabaseService databaseService = Mockito.mock(DatabaseService.class);

    String userId = "123";
    User user = new User(userId, "John Doe");

    Mockito.when(userService.getUser(userId))
      .thenReturn(Mono.just(user));
    Mockito.when(emailService.sendEmail(userId))
      .thenReturn(Mono.just(true));
    Mockito.when(databaseService.saveUserData(user))
      .thenReturn(Mono.just(true));

    UserController userController = new UserController(userService, emailService, databaseService);

    Mono<ResponseEntity<String>> responseMono = userController.combineAllDataFor(userId);

    StepVerifier.create(responseMono)
      .expectNextMatches(responseEntity -> responseEntity.getStatusCode() == HttpStatus.OK && responseEntity.getBody()
        .equals("Response: " + user + ", Email Sent: true, Database Result: " + true))
      .verifyComplete();
}
We’re using StepVerifier to verify the response entity has the expected 200 OK status code as well as a body that combines the results of different Monos using zipWhen().

6. Conclusion

6.结论

In this tutorial, we had a quick look at using zipWhen() with Mono in Reactive programming. We used the example of User data collection, email, and storage components, all of which provide Monos of different types. This example demonstrated how to use zipWhen()  to efficiently handle data dependencies and orchestrate asynchronous operations in a reactive Spring WebFlux application.

在本教程中,我们快速了解了如何在反应式编程中使用 MonozipWhen() 。我们使用了用户数据收集、电子邮件和存储组件的示例,所有这些组件都提供了不同类型的 Monos。该示例演示了如何使用zipWhen()在反应式 Spring WebFlux 应用程序中高效地处理数据依赖关系和协调异步操作。

As always, the full source code is available over on GitHub.

一如既往,您可以在 GitHub 上获取完整的源代码