Working with Exceptions in Java CompletableFuture – 在 Java 中处理异常 CompletableFuture

最后修改: 2023年 11月 21日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.导言

Java 8 has introduced a new abstraction based on Future to run asynchronous tasks – CompletableFuture class. It basically came to overcome the issues of the old Future API.

Java 8 引入了基于 Future 的新抽象来运行异步任务 – CompletableFuture 类。它基本上克服了旧版 Future API 的问题。

In this tutorial, we’re going to look into the ways to work with exceptions when we use CompletableFuture.

在本教程中,我们将探讨在使用 CompletableFuture 时处理异常的方法。

2. CompletableFuture Recap

2.CompletableFuture 回顾

First, we might need to recap a little bit about what the CompletableFuture is. CompletableFuture is a Future implementation that allows us to run and, most importantly, chain asynchronous operations. In general, there are three possible outcomes for the async operation to complete – normally, exceptionally, or can be canceled from outside. CompletableFuture has various API methods to address all of these possible outcomes.

首先,我们可能需要回顾一下 CompletableFuture 是什么。CompletableFuture是一种 Future 实现,它允许我们运行异步操作,最重要的是,还允许我们连锁异步操作。一般来说,异步操作有三种可能的完成结果:正常完成、异常完成或从外部取消。CompletableFuture 拥有各种 API 方法来处理所有这些可能的结果。

As with lots of the other methods in CompletableFuture, these methods have non-async, async, and async using specific Executor variations. So, without further delay, let’s look at ways to handle exceptions in CompletableFuture one by one.

CompletableFuture 中的许多其他方法一样,这些方法也有 非异步、异步和使用特定 Executor 变体的异步。因此,事不宜迟,让我们逐一了解在 CompletableFuture 中处理异常的方法。

3. handle()

3.handle().

First, we have a handle() method. By using this method, we can access and transform the entire result of the CompletionStage regardless of the outcome. That is, the handle() method accepts a BiFunction functional interface. So, this interface has two inputs. In the handle() method case, parameters will be the result of the previous CompletionStage and the Exception that occurred.

首先,我们有一个 handle() 方法。通过使用该方法,我们可以访问和转换 CompletionStage 的整个结果,而不管结果如何。也就是说, handle() 方法接受一个 BiFunction 函数接口。因此,该接口有两个输入。在 handle() 方法中,参数将是上一个 CompletionStage 的结果和发生的 异常

The important thing is that both of these parameters are optional, meaning that they can be null. This is obvious in some sense since the previous CompletionStage was completed normally. Then the Exception should be null since there was no any, similarly with CompletionStage result value nullability.

重要的是,这两个参数都是可选的,这意味着它们可以是 null 。这在某种意义上是显而易见的,因为之前的 CompletionStage 已经正常完成。那么 Exception 就应该是null,因为没有任何异常,这与 CompletionStage 结果值为空的情况类似。

Let’s now look at an example of handle() method usage:

现在让我们来看一个使用 handle() 方法的示例:

@ParameterizedTest
@MethodSource("parametersSource_handle")
void whenCompletableFutureIsScheduled_thenHandleStageIsAlwaysInvoked(int radius, long expected)
  throws ExecutionException, InterruptedException {
    long actual = CompletableFuture
      .supplyAsync(() -> {
          if (radius <= 0) {
              throw new IllegalArgumentException("Supplied with non-positive radius '%d'");
          }
          return Math.round(Math.pow(radius, 2) * Math.PI);
      })
      .handle((result, ex) -> {
          if (ex == null) {
              return result;
          } else {
              return -1L;
          }
      })
      .get();

    Assertions.assertThat(actual).isEqualTo(expected);
}

static Stream<Arguments> parameterSource_handle() {
    return Stream.of(Arguments.of(1, 3), Arguments.of(1, -1));
}

The thing to notice here is that the handle() method returns a new CompletionStage that will always execute, regardless of the previous CompletionStage result. So, handle() transforms the source value from the previous stage to some output value. Therefore, the value that we’re going to obtain via the get() method is the one returned from the handle() method.

这里需要注意的是,handle() 方法会返回一个新的 CompletionStage,无论前一个 CompletionStage 的结果如何,它都将始终执行。因此,handle() 会将前一阶段的源值转换为某个输出值。因此,我们将通过 get() 方法获得的值就是从handle() 方法返回的值。

4. exceptionally()

4.例外()</em

The handle() method is not always convenient, especially if we want to process exceptions only if there is one. Luckily, we have an alternative – exceptionally().

handle()方法并不总是很方便,尤其是如果我们只想在出现异常时才处理异常。幸运的是,我们有一个替代方法–exceptionally()

This method allows us to provide a callback to be executed only if the previous CompletionStage ended up with an Exception. If no exceptions were thrown, then the callback is omitted, and the execution chain is continued to the next callback (if any) with the value of the previous one.

该方法允许我们提供一个回调,仅在上一个 CompletionStage 出现 Exception 时执行。如果没有抛出异常,则省略回调,执行链将使用前一个回调的值继续执行到下一个回调(如果有的话)。

To understand, let’s look at a concrete example:

为了理解这一点,我们来看一个具体的例子:

@ParameterizedTest
@MethodSource("parametersSource_exceptionally")
void whenCompletableFutureIsScheduled_thenExceptionallyExecutedOnlyOnFailure(int a, int b, int c, long expected)
  throws ExecutionException, InterruptedException {
    long actual = CompletableFuture
      .supplyAsync(() -> {
          if (a <= 0 || b <= 0 || c <= 0) {
              throw new IllegalArgumentException(String.format("Supplied with incorrect edge length [%s]", List.of(a, b, c)));
          }
          return a * b * c;
      })
      .exceptionally((ex) -> -1)
      .get();

    Assertions.assertThat(actual).isEqualTo(expected);
}

static Stream<Arguments> parametersSource_exceptionally() {
    return Stream.of(
      Arguments.of(1, 5, 5, 25),
      Arguments.of(-1, 10, 15, -1)
    );
}

So here, it works in the same manner as handle(), but we have an Exception instance as a parameter to our callback. This parameter will never be null, so our code is a bit simpler now.

因此,这里的工作方式与 handle() 相同,但我们将 Exception 实例作为回调的参数。该参数永远不会为 ,因此我们的代码现在变得更简单了。

The important thing to notice here is the exceptionally() method’s callback executes only if the previous stage completes with an Exception. It basically means that if the Exception occurred somewhere in the execution chain, and there already was a handle() method that caught it – the excpetionally() callback won’t be executed afterward:

这里需要注意的是,exceptionally() 方法的回调只有在前一阶段以 Exception 结束时才会执行。这基本上意味着,如果 Exception 发生在执行链的某处,并且已经有一个 handle() 方法捕获了它,那么 excpetionally() 回调将不会在之后执行:

@ParameterizedTest
@MethodSource("parametersSource_exceptionally")
void givenCompletableFutureIsScheduled_whenHandleIsAlreadyPresent_thenExceptionallyIsNotExecuted(int a, int b, int c, long expected)
  throws ExecutionException, InterruptedException {
    long actual = CompletableFuture
      .supplyAsync(() -> {
          if (a <= 0 || b <= 0 || c <= 0) {
              throw new IllegalArgumentException(String.format("Supplied with incorrect edge length [%s]", List.of(a, b, c)));
          }
          return a * b * c;
      })
      .handle((result, throwable) -> {
          if (throwable != null) {
              return -1;
          }
          return result;
      })
      .exceptionally((ex) -> {
          System.exit(1);
          return 0;
      })
      .get();

    Assertions.assertThat(actual).isEqualTo(expected);
}

Here, exceptionally() is not invoked since the handle() method already catches the Exception, if any. Therefore, unless the Exception occurs inside the handle() method, the exceptionally() method here won’t be ever executed.

在这里,exceptionally() 不会被调用,因为 handle() 方法已经捕获了异常(如果有的话)。因此,除非 Exception 发生在 handle() 方法内部,否则这里的 exceptionally() 方法不会被执行。

5. whenComplete()

5.当完成时()</em

We also have a whenComplete() method in the API. It accepts the BiConsumer with two parameters: the result and the exception from the previous stage, if any. This method, however, is significantly different from the ones above.

我们在 API 中还有一个 whenComplete() 方法。它接受带有两个参数的 BiConsumer :结果和上一阶段的异常(如果有)。不过,该方法与上述方法有很大不同。

The difference is that whenComplete() will not translate any exceptional outcomes from the previous stages. So, even considering that whenComplete()‘s callback will always run, the exception from the previous stage, if any, will propagate further:

不同之处在于,whenComplete() 不会转换前一阶段的任何异常结果。因此,即使考虑到 whenComplete() 的回调将始终运行,前一阶段的异常(如果有的话)也会进一步传播:

@ParameterizedTest
@MethodSource("parametersSource_whenComplete")
void whenCompletableFutureIsScheduled_thenWhenCompletedExecutedAlways(Double a, long expected) {
    try {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        long actual = CompletableFuture
          .supplyAsync(() -> {
              if (a.isNaN()) {
                  throw new IllegalArgumentException("Supplied value is NaN");
              }
              return Math.round(Math.pow(a, 2));
          })
          .whenComplete((result, exception) -> countDownLatch.countDown())
          .get();
        Assertions.assertThat(countDownLatch.await(20L, java.util.concurrent.TimeUnit.SECONDS));
        Assertions.assertThat(actual).isEqualTo(expected);
    } catch (Exception e) {
        Assertions.assertThat(e.getClass()).isSameAs(ExecutionException.class);
        Assertions.assertThat(e.getCause().getClass()).isSameAs(IllegalArgumentException.class);
    }
}

static Stream<Arguments> parametersSource_whenComplete() {
    return Stream.of(
      Arguments.of(2d, 4),
      Arguments.of(Double.NaN, 1)
    );
}

As we can see here, callback inside whenCompleted() runs in both test invocations. However, in the second invocation, we completed with the ExecutionException, which has the cause of our IllegalArgumentException. So, as we can see, the exception from the callback propagates to the callee. We’ll cover the reasons why it happens in the next section.

我们可以看到,在两次测试调用中,whenCompleted() 内的回调都在运行。但是,在第二次调用中,我们完成了 执行异常,这就是我们的 IllegalArgumentException 的起因。因此,我们可以看到,来自回调的异常传播到了被调用者。我们将在下一节中介绍发生这种情况的原因。

6. Unhandled Exceptions

6.未处理异常

Finally, we need to touch on unhandled exceptions a bit. In general, if an exception remains uncaught, then the CompletableFuture completes with an Exception that doesn’t propagate to the callee. In our case above, we get the ExecutionException from the get() method invocation. So, this is because we tried to access the result when CompletableFuture ended up with an Exception.

最后,我们需要谈谈未处理的异常。一般来说,如果异常未被捕获,那么 CompletableFuture 将以一个不会传播给被调用者的 Exception 结束。在上述案例中,我们从 get() 方法调用中获得了 ExecutionException 异常。因此,这是因为我们在 CompletableFuture 时试图访问结果,结果却出现了异常

Thus, we need to check the result of the CompletableFuture before the get() invocation. There are a couple of ways to do so. The first and probably the most familiar to all approach is via isCompletedExceptionally()/isCancelled()/isDone() methods. Those methods return a boolean in case CompletableFutre completes with the exception, is cancelled from outside, or is completed successfully.

因此,我们需要在调用 get() 之前检查 CompletableFuture 的结果。有几种方法可以做到这一点。第一种可能也是大家最熟悉的方法是通过isCompletedExceptionally()/isCancelled()/isDone() 方法如果 CompletableFutre 完成时出现异常、被外部取消或成功完成,这些方法将返回一个 布尔值

However, it is worth mentioning that there is also a state() method that returns a State enum instance. This instance represents the state of the CompletableFuture, like RUNNING, SUCCESS, etc. So, this is another way to access the outcome of the CompletableFuture.

不过,值得一提的是,还有一个 state() 方法会返回一个 State 枚举实例。该实例代表 CompletableFuture 的状态,如 RUNNINGSUCCESS 等。因此,这是另一种访问 CompletableFuture 结果的方法。

7. Conclusion

7.结论

In this article, we’ve explored the ways to handle exceptions that occur in CompletableFuture stages.

在本文中,我们探讨了处理 CompletableFuture 阶段中出现的异常的方法。

As always, the source code for this article is available over on GitHub.

与往常一样,本文的源代码可在 GitHub 上获取。