How to Collect All Results and Handle Exceptions With CompletableFuture in a Loop – 如何在循环中使用 CompletableFuture 收集所有结果并处理异常

最后修改: 2024年 2月 15日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Java 8’s CompletableFuture is well-suited to handling asynchronous computation. For instance, a web client may employ CompletableFuture when making a server call. It’s easy to get started and handle an individual CompletableFuture response. However, it’s not immediately clear how to collect the results of multiple CompletableFuture executions while also handling exceptions.

Java 8 的 CompletableFuture 非常适合处理异步计算。例如,网络客户端在调用服务器时可能会使用 CompletableFuture 。开始和处理单个 CompletableFuture 响应非常容易。然而,如何在处理异常的同时收集多个 CompletableFuture 执行的结果并不明确

In this tutorial, we’ll develop a simple mock microservice client that returns a CompletableFuture, and see how to call it multiple times to generate a summary of successes and failures.

在本教程中,我们将开发一个返回 CompletableFuture 的简单模拟微服务客户端,并了解如何多次调用它来生成成功和失败的总结。

2. An Example Microservice Client

2.微服务客户端示例

For our example, let’s write a simple microservice client that’s responsible for creating a resource and returning that resource’s identifier.

在我们的示例中,让我们编写一个简单的微服务客户端,负责创建一个资源并返回该资源的标识符。

We’ll declare a simple interface, MicroserviceClient, that we can mock out (using Mockito) in our unit test:

我们将声明一个简单的接口 MicroserviceClient,我们可以在单元测试中模拟出该接口(使用 Mockito)

interface MicroserviceClient {
    CompletableFuture<Long> createResource(String resourceName);
}

Unit testing CompletableFuture comes with its own challenges, but testing a single call to MicroserviceClient would be straightforward. Rather than detailing that here, let’s move on to handling multiple client calls that can throw an exception.

单元测试 CompletableFuture 有其自身的挑战,但测试对 MicroserviceClient 的单个调用将非常简单。与其在此详细说明,不如让我们继续处理可能抛出异常的多个客户端调用。

3. Combining Multiple Calls to Microservice

3.结合多次调用微服务

Let’s start by creating a unit test and declaring a mock of our MicroserviceClient that returns a successful response for an input of “Good Resource” and throws an exception for an input of “Bad Resource“:

首先,让我们创建一个单元测试,并声明一个 MicroserviceClient 的 mock,该 mock 会在输入”好资源“时返回成功响应,而在输入”坏资源“时抛出异常:

@ParameterizedTest
@MethodSource("clientData")
public void givenMicroserviceClient_whenMultipleCreateResource_thenCombineResults(List<String> inputs,
  int expectedSuccess, int expectedFailure) throws ExecutionException, InterruptedException {
    MicroserviceClient mockMicroservice = mock(MicroserviceClient.class);
    when(mockMicroservice.createResource("Good Resource"))
      .thenReturn(CompletableFuture.completedFuture(123L));
    when(mockMicroservice.createResource("Bad Resource"))
      .thenReturn(CompletableFuture.failedFuture(new IllegalArgumentException("Bad Resource")));
}

We’ll make this a parameterized test and pass in varying sets of data with a MethodSource. We’ll need to create a static method to supply our test with a Stream of JUnit Arguments:

我们将使其成为 参数化测试,并通过 MethodSource 传入不同的数据集。我们需要创建一个静态方法,为测试提供 JUnit ArgumentsStream 方法:

private static Stream<Arguments> clientData() {
    return Stream.of(
      Arguments.of(List.of("Good Resource"), 1, 0),
      Arguments.of(List.of("Bad Resource"), 0, 1),
      Arguments.of(List.of("Good Resource", "Bad Resource"), 1, 1),
      Arguments.of(List.of("Good Resource", "Bad Resource", "Good Resource", "Bad Resource", 
        "Good Resource"), 3, 2)
    );
}

This creates four test executions that pass in a List of inputs and the expected count of successes and failures.

这将创建四个测试执行,并输入输入的 List 以及预期的成功和失败次数。

Next, let’s return to our unit test and use the test data to call MicroserviceClient and collect each resulting CompletableFuture into a List:

接下来,让我们返回单元测试,使用测试数据来调用 MicroserviceClient 并将生成的每个 CompletableFuture 收集到 List 中:

List<CompletableFuture<Long>> clientCalls = new ArrayList<>();
for (String resource : inputs) {
    clientCalls.add(mockMicroservice.createResource(resource));
}

Now, we have the core part of our problem: a List of CompletableFuture objects that we need to complete and collect the results of, while handling any exceptions we encounter.

现在,我们有了问题的核心部分:一个 CompletableFuture 对象的 List 列表,我们需要完成该列表并收集其结果,同时处理遇到的任何异常。

3.1. Handling Exceptions

3.1 处理异常

Before getting into how we’ll complete each CompletableFuture, let’s define a helper method for handling exceptions. We’ll also define and mock out a Logger to mimic real-world error handling:

在了解我们将如何完成每个 CompletableFuture 之前,让我们先定义一个用于处理异常的辅助方法。我们还将定义和模拟一个 Logger 来模拟现实世界中的错误处理:

private final Logger logger = mock(Logger.class);

private Long handleError(Throwable throwable) {
    logger.error("Encountered error: " + throwable);
    return -1L;
}

interface Logger {
    void error(String message);
}

The helper method simply “logs” the error message and returns -1, which we’re using to designate an invalid resource.

辅助方法只是 “记录 “错误信息并返回 -1,我们用它来指定无效资源。

3.2. Completing a CompletableFuture With Exception Handling

3.2.使用异常处理完成 CompletableFuture

Now, we need to complete all of the CompletableFuture and handle any exceptions appropriately. We can do this by leveraging a few tools CompleteableFuture provides us with:

现在,我们需要完成所有 CompletableFuture 并适当处理任何异常。我们可以利用一些工具来实现这一目标CompleteableFuture为我们提供了:

  • exceptionally(): takes a function to execute if the CompletableFuture completes with an exception
  • join(): returns the result of the CompletableFuture once it completes

Then, we can define a helper method for completion of a single CompletableFuture:

然后,我们可以定义一个完成单个 CompletableFuture 的辅助方法:

private Long handleFuture(CompletableFuture<Long> future) {
    return future
      .exceptionally(this::handleError)
      .join();
}

Notably, we’re using exceptionally() to handle any exceptions that the MicroserviceClient calls could throw via our handleError() helper method. Finally, we’re calling join() on the CompletableFuture to wait for completion of the client call and return its resource identifier.

值得注意的是,我们使用 exceptionally() 来处理 MicroserviceClient 调用可能通过 handleError() 辅助方法抛出的任何异常。最后,我们在 CompletableFuture 上调用 join() 等待客户端调用完成并返回其资源标识符。

3.3. Handling a List of CompletableFuture

3.3.处理 CompletableFuture 的 列表</em

Returning to our unit test, we can now leverage our helper methods along with Java’s Stream API to create a simple statement that resolves all of the client calls:

回到单元测试,我们现在可以利用辅助方法和 Java 的流 API 来创建一个简单的语句,解决所有客户端调用:

Map<Boolean, List<Long>> resultsByValidity = clientCalls.stream()
  .map(this::handleFuture)
  .collect(Collectors.partitioningBy(resourceId -> resourceId != -1L));

Let’s break down this statement:

让我们来分析一下这句话:

  • We map each CompletableFuture into the resulting resource identifier using our handleFuture() helper method
  • We use Java’s Collectors.partitioningBy() utility to split the resulting resource identifiers into separate lists based on validity

We can easily verify our test by using an assertion on the size of the partitioned Lists, as well as checking calls to our mocked Logger:

通过对分区 Lists 的大小使用断言,以及检查对模拟 Logger 的调用,我们可以轻松验证我们的测试:</em

List<Long> validResults = resultsByValidity.getOrDefault(true, List.of());
assertThat(validResults.size()).isEqualTo(successCount);

List<Long> invalidResults = resultsByValidity.getOrDefault(false, List.of());
assertThat(invalidResults.size()).isEqualTo(errorCount);
verify(logger, times(errorCount))
  .error(eq("Encountered error: java.lang.IllegalArgumentException: Bad Resource"));

Running the test, we can see our partitioned lists match what we expect.

运行测试后,我们可以看到分区列表与预期相符。

4. Conclusion

4.结论

In this article, we learned how to handle completing a collection of CompletableFuture. If necessary, we could easily extend our approach to use more robust error handling or complex business logic.

在本文中,我们学习了如何处理完成 CompletableFuture 集合。如有必要,我们可以轻松扩展我们的方法,以使用更强大的错误处理或复杂的业务逻辑。

As always, all of the code for the article can be found over on GitHub.

与往常一样,本文的所有代码都可以在 GitHub 上找到