Retry Logic with CompletableFuture – 使用 CompletableFuture 的重试逻辑

最后修改: 2023年 10月 26日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we’ll learn how to apply retry logic to CompletableFuture objects. Initially, we’ll retry the task wrapped within a CompletableFuture. Following that, we’ll harness the CompletableFuture API to create a chain of multiple instances, enabling us to re-execute the task when the future encounters an exceptional completion.

在本文中,我们将学习如何将重试逻辑应用于 CompletableFuture 对象。最初,我们将重试封装在 CompletableFuture 中的任务。随后,我们将利用 CompletableFuture API 创建一个由多个实例组成的链,使我们能够在未来任务遇到异常完成时重新执行任务。

2. Retrying the Task

2.重试任务

A simple approach to retry a task would be to leverage the decorator pattern and implement it using the classical OOP style with classes and interfaces. On the other hand, we can choose a more concise and functional approach, taking advantage of the higher-order functions.

重试任务的一种简单方法是利用 decorator 模式,并使用类和接口的经典 OOP 风格来实现它。另一方面,我们可以利用高阶函数,选择一种更为简洁的函数式方法。

Initially, we’ll declare a function that takes as a parameter a Supplier<T> and the maximum number of invocations. After that, we’ll use a while loop and a try-catch block to invoke the function multiple times, if needed. Finally, we’ll preserve the original data type by returning yet another Supplier<T>:

首先,我们将声明一个函数,该函数的参数是 Supplier<T> 和最大调用次数。然后,如果需要,我们将使用 while 循环和 try-catch 块多次调用函数。最后,我们将通过返回另一个 Supplier<T> 来保留原始数据类型:

static <T> Supplier<T> retryFunction(Supplier<T> supplier, int maxRetries) {
    return () -> {
        int retries = 0;
	while (retries < maxRetries) {
	    try {
	        return supplier.get();
	    } catch (Exception e) {
	        retries++;
	    }
        }
	throw new IllegalStateException(String.format("Task failed after %s attempts", maxRetries));
    };
}

We can further improve this decorator by allowing for the definition of specific exceptions to be retried or by introducing a delay between invocations. But, for simplicity’s sake, let’s proceed with creating the CompletableFuture based on this function decorator:

我们可以通过定义重试的特定异常或在调用之间引入延迟来进一步改进该装饰器。不过,为了简单起见,让我们继续根据此函数装饰器创建 CompletableFuture

static <T> CompletableFuture<T> retryTask(Supplier<T> supplier, int maxRetries) {
    Supplier<T> retryableSupplier = retryFunction(supplier, maxRetries);
    return CompletableFuture.supplyAsync(retryableSupplier);
}

Now, let’s proceed with writing tests for this functionality. To begin, we’ll need a method that will be retried by our CompletableFuture. For this purpose, we’ll design a method that fails four times by throwing RuntimeExceptions and successfully completes on the fifth attempt, returning an integer value:

现在,让我们继续编写该功能的测试。首先,我们需要一个将被 CompletableFuture 重试的方法。为此,我们将设计一个通过抛出 RuntimeExceptions 来失败四次,并在第五次尝试时成功完成并返回一个整数值的方法:

AtomicInteger retriesCounter = new AtomicInteger(0);

@BeforeEach
void beforeEach() {
    retriesCounter.set(0);
}

int failFourTimesThenReturn(int returnValue) {
    int retryNr = retriesCounter.get();
    if (retryNr < 4) {
        retriesCounter.set(retryNr + 1);
	throw new RuntimeException();
    }
    return returnValue;
}

Now, we can finally test our retryTask() function and assert that the expected value is returned. Also, we can check the number of invocations by interrogating the retriesCounter:

现在,我们终于可以测试 retryTask() 函数,并断言返回的是预期值。此外,我们还可以通过查询 retriesCounter 来检查调用次数:

@Test
void whenRetryingTask_thenReturnsCorrectlyAfterFourInvocations() {
    Supplier<Integer> codeToRun = () -> failFourTimesThenReturn(100);

    CompletableFuture<Integer> result = retryTask(codeToRun, 10);

    assertThat(result.join()).isEqualTo(100);
    assertThat(retriesCounter).hasValue(4);
}

Furthermore, if we call the same function with a smaller value for the maxRetires parameter, we’ll expect the Future to complete exceptionally. The original IllegalStateException should be wrapped into a CompletionException, but the original error message should be preserved:

此外,如果我们使用较小的maxRetires参数值调用相同的函数,我们将期望Future异常完成。原来的 IllegalStateException 应封装为 CompletionException ,但应保留原来的错误信息:

@Test
void whenRetryingTask_thenThrowsExceptionAfterThreeInvocations() {
    Supplier<Integer> codeToRun = () -> failFourTimesThenReturn(100);

    CompletableFuture<Integer> result = retryTask(codeToRun, 3);

    assertThatThrownBy(result::join)
      .isInstanceOf(CompletionException.class)
      .hasMessageContaining("IllegalStateException: Task failed after 3 attempts");
}

3. Retrying the CompletableFuture

3.重试 CompletableFuture

The CompletableFuture API provides options for handling exceptions as they arise. As a result, we can utilize methods such as exceptionally() instead of creating our function decorator.

CompletableFuture应用程序接口提供了在出现异常时进行处理的选项。因此,我们可以使用 exceptionally() 等方法来代替创建我们的函数装饰器。

3.1. Unsafe Retry

3.1.不安全重试

The exceptionally() method enables us to specify an alternative function that will be invoked when the initial invocation completes with an exception. For instance, if we intend to retry the invocation two times, we can utilize the fluent API to add two of these fallbacks:

通过 exceptionally() 方法,我们可以指定一个替代函数,当初始调用出现异常时,该函数将被调用。例如,如果我们打算重试调用两次,我们可以利用流畅 API 添加两个这样的回退函数:

static <T> CompletableFuture<T> retryTwice(Supplier<T> supplier) {
    return CompletableFuture.supplyAsync(supplier)
      .exceptionally(__ -> supplier.get())
      .exceptionally(__ -> supplier.get());
}

Since we need a variable number of retries, let’s refactor the code and use a for loop instead:

既然我们需要的重试次数是可变的,那就重构代码,改用 for 循环:

static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
    CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
    for (int i = 0; i < maxRetries; i++) {
        cf = cf.exceptionally(__ -> supplier.get());
    }
    return cf;
}

We can test retryUnsafe() using the same test helpers and anticipate similar outcomes. Nonetheless, there will be a subtle distinction if the initial supplier completes before the final CompletableFuture is created with all its exceptionally() fallbacks. In such cases, the function will indeed be retried the specified number of times. However, this retry process will occur on the main thread, resulting in the loss of asynchrony.

我们可以使用相同的测试助手测试 retryUnsafe() 并预测类似的结果。 在这种情况下,函数确实会重试指定次数。但是,重试过程将在主线程上进行,从而导致异步的丢失。

To illustrate this, we can insert a 100-millisecond pause just before the for loop, which iteratively invokes the exceptionally() method.

为了说明这一点,我们可以在 for 循环之前插入一个 100 毫秒的暂停,该循环会迭代调用 exceptionally() 方法。

static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
    CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);  
    sleep(100l);
    for (int i = 0; i < maxRetries; i++) {
        cf = cf.exceptionally(__ -> supplier.get());
    }
    return cf;
}

Following that, we’ll modify the failFourTimesThenReturn() test method to log the attempt number and the current thread name on each invocation of this method. Now, let’s re-run the test and check the console:

随后,我们将修改 failFourTimesThenReturn() 测试方法,以记录每次调用该方法时的尝试次数和当前线程名称。现在,让我们重新运行测试并检查控制台:

invocation: 0, thread: ForkJoinPool.commonPool-worker-1
invocation: 1, thread: main
invocation: 2, thread: main
invocation: 3, thread: main
invocation: 4, thread: main

As anticipated, the subsequent invocations were executed by the main thread. This can become problematic if the initial invocation is quick, but the subsequent ones are expected to be slower.

正如预期的那样,后续调用由主线程执行。如果初始调用速度很快,但随后的调用预期会较慢,这就会产生问题。

3.2. Retry Asynchronously

3.2.异步重试

We can address this concern by ensuring that the subsequent invocations are carried out asynchronously. To facilitate this, a dedicated method was introduced to the API, starting with Java 12. By using exceptionallyAsync(), we’ll ensure all the retries will be performed asynchronously, regardless of the speed at which the initial CompletableFuture completes:

我们可以通过确保异步执行后续调用来解决这一问题。通过使用exceptionallyAsync(),我们将确保所有重试都以异步方式进行,而不管初始 CompletableFuture 完成的速度如何

static <T> CompletableFuture<T> retryExceptionallyAsync(Supplier<T> supplier, int maxRetries) {
   CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
   for (int i = 0; i < maxRetries; i++) {
      cf = cf.exceptionallyAsync(__ -> supplier.get());
   }
   return cf;
}

Let’s quickly run the test and examine the logs:

让我们快速运行测试并检查日志:

invocation: 0, thread: ForkJoinPool.commonPool-worker-1
invocation: 1, thread: ForkJoinPool.commonPool-worker-1
invocation: 2, thread: ForkJoinPool.commonPool-worker-1
invocation: 3, thread: ForkJoinPool.commonPool-worker-2
invocation: 4, thread: ForkJoinPool.commonPool-worker-2

As expected, none of the invocations were executed by the main thread.

不出所料,主线程没有执行任何调用。

3.3. Nesting CompletableFutures

3.3.嵌套 CompletableFutures

If we need a solution compatible with versions of Java prior to 12, we can manually enhance the first example to achieve full asynchrony. To accomplish this, we must ensure that the fallback is executed asynchronously within a new CompletableFuture:

如果我们需要一个与 Java 12 之前版本兼容的解决方案,我们可以手动增强第一个示例以实现完全异步。要实现这一点,我们必须确保回退在一个新的 CompletableFuture 中异步执行:</em

cf.exceptionally(__ -> CompletableFuture.supplyAsync(supplier))

However, the code above will not compile because the datatypes do not match, but we can fix it in three steps. Firstly, we’ll need to double-nest the initial Future. We can easily do this through compleatedFuture():

但是,上面的代码由于数据类型不匹配而无法编译,不过我们可以分三步解决这个问题。首先,我们需要对初始 Future 进行双重嵌套。我们可以通过 compleatedFuture() 轻松实现这一点:

CompletableFuture<CompletableFuture<T>> temp = cf.thenApply(value -> CompletableFuture.completedFuture(value));

Now the types are matching, so we can safely apply the exceptionally() fallback:

现在类型匹配了,因此我们可以安全地应用 exceptionally() 回调:

temp = temp.exceptionally(__ -> CompletableFuture.supplyAsync(supplier));

Lastly, we’ll use thenCompose() to flatten the object and go back to the original type:

最后,我们将使用 thenCompose() 来扁平化对象并返回原始类型:

cf = temp.thenCompose(t -> t);

Finally, let’s combine everything and create a CompletableFuture with a variable number of asynchronous fallbacks. Additionally, let’s take advantage of the fluent API, method references, and utility functions to keep the code concise:

最后,让我们将所有内容结合起来,创建一个 CompletableFuture ,其中包含数量可变的异步回退。此外,让我们利用流畅的 API、方法引用和实用功能来保持代码的简洁:

static <T> CompletableFuture<T> retryNesting(Supplier<T> supplier, int maxRetries) {
    CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
    for (int i = 0; i < maxRetries; i++) {
        cf = cf.thenApply(CompletableFuture::completedFuture)
	  .exceptionally(__ -> CompletableFuture.supplyAsync(supplier))
	  .thenCompose(Function.identity());
    }
    return cf;
}

4. Conclusion

4.结论

In this article, we explored the concept of retrying invocations of a function within a CompletableFuture. We began by delving into the implementation of the decorator pattern in a functional style, allowing us to retry the function itself.

在本文中,我们探讨了在 CompletableFuture 中重试函数调用的概念。我们首先深入探讨了装饰器模式在函数式风格中的实现,这使得我们可以重试函数本身。

Subsequently, we leveraged the CompletableFuture API to accomplish the same task while maintaining asynchronous flow. Our discovery included the exceptionallyAsync() method introduced in Java 12, which is perfect for this purpose. Finally, we presented an alternative approach, relying solely on methods from the original Java 8 API.

随后,我们利用 CompletableFuture API 来完成相同的任务,同时保持异步流程。我们的发现包括 Java 12 中引入的 exceptionallyAsync() 方法,该方法非常适合这一目的。最后,我们提出了另一种方法,即完全依赖于原始 Java 8 API 中的方法。

As always, we can find working code examples over on GitHub.

一如既往,我们可以在 GitHub 上找到工作代码示例