Convert From List of CompletableFuture to CompletableFuture List – 从可完成未来列表转换为可完成未来列表

最后修改: 2023年 10月 30日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll learn how to convert a List<CompletableFuture<T>> object to a CompletableFuture<List<T>>.

在本教程中,我们将学习如何将 List<CompletableFuture<T>> 对象转换为 CompletableFuture<List<T>> 对象。

This conversion can be very useful in many cases. A prime example would be when we have to make multiple calls to a remote service, typically an asynchronous operation, and aggregate the results into a single List. Additionally, we end up waiting on a single CompletableFuture object which provides us the results list when all operations are finished or throws an exception if one or more end in failure.

这种转换在许多情况下都非常有用。一个典型的例子是,我们必须多次调用远程服务(通常是异步操作),并将结果汇总到一个 List 中。此外,我们最终将等待单个 CompletableFuture 对象,该对象将在所有操作完成后为我们提供结果列表,或者在一个或多个操作失败时抛出异常。

We’ll first see a naïve way of doing the conversion and then look at a simpler and safer approach.

我们先来看看一种天真的转换方法,然后再看看一种更简单、更安全的方法。

2. Chaining CompletableFutures

2.连锁 CompletableFutures

One way of doing this is to chain the CompletableFutures using their thenCompose() method. This way, we can create a single object that resolves once all the previous futures resolve, one by one, akin to a domino construct.

一种方法是使用 thenCompose() 方法将 CompletableFutures 链起来。通过这种方法,我们可以创建一个单一对象,一旦前面的所有未来逐一解析,该对象就会解析,类似于多米诺骨牌结构。

2.1. Implementation

2.1.实施

First, let’s create a mock asynchronous operation:

首先,让我们创建一个模拟异步操作:

public class Application {
    ScheduledExecutorService asyncOperationEmulation;
    Application initialize() {
        asyncOperationEmulation = Executors.newScheduledThreadPool(10);
        return this;
    }

    CompletableFuture<String> asyncOperation(String operationId) {
        CompletableFuture<String> cf = new CompletableFuture<>();
        asyncOperationEmulation.submit(() -> {
            Thread.sleep(100);
            cf.complete(operationId);
        });
        return cf;
    }

We’ve created an Application class to host our test code and the asyncOperation() method which simply sleeps for 100 ms. We employ an Executor with 10 threads to dispatch everything asynchronously.

我们创建了一个 Application 类来托管我们的测试代码和 asyncOperation() 方法,该方法只需休眠 100 毫秒。我们使用一个带有 10 线程的 Executor 来异步分派一切。

To gather all of our operation results, in this case, simple operationId strings, we’ll chain the CompletableFutures generated from the asyncOperation() method:

为了收集所有操作结果(在本例中为简单的 operationId 字符串),我们将对 asyncOperation() 方法生成的 CompletableFuture 进行链式处理:

void startNaive() {
    List<CompletableFuture<String>> futures = new ArrayList<>();
    for (int i = 1; i <= 1000; i++) {
        String operationId = "Naive-Operation-" + i;
        futures.add(asyncOperation(operationId));
    }
    CompletableFuture<List<String>> aggregate = CompletableFuture.completedFuture(new ArrayList<>());
    for (CompletableFuture<String> future : futures) {
        aggregate = aggregate.thenCompose(list -> {
            list.add(future.get());
            return CompletableFuture.completedFuture(list);
        });
    }
    final List<String> results = aggregate.join();
    for (int i = 0; i < 10; i++) {
        System.out.println("Finished " + results.get(i));
    }

    close();
}

We start by creating a completed CompleteableFuture using the static completedFuture() method and provide an empty List as the completion result. Using thenCompose() we create a Runnable that executes once the previous future has finished, in this case immediately. The thenCompose() method returns a new CompletableFuture which resolves once both the first and second future finish. We replace the aggregate reference with this new future object. This allows us to keep chaining these calls inside the iteration loop over the futures list.

我们首先使用静态 completedFuture() 方法创建一个已完成的CompleteableFuture,并提供一个空 List 作为完成结果。我们使用 thenCompose() 创建了一个 Runnable,它将在前一个未来完成后立即执行,在本例中就是立即执行。thenCompose()方法会返回一个新的CompletableFuture,它会在第一个和第二个未来完成后解析。我们将用这个新的 future 对象替换 aggregate 引用。这样,我们就可以在 futures 列表的迭代循环中不断链式调用这些调用。

Inside the Runnable we’ve created, we wait for the future to finish and add the result to the list. We then return a completed future with that list and the result. This will pass the list further down the thenCompose() chain, letting us add the future results one by one.

在我们创建的 Runnable 中,我们等待 future 结束,并将结果添加到 list 中。然后,我们将返回一个包含该 list 和结果的已完成 future。这将把 list 传递到 thenCompose() 链的下一环节,让我们可以 逐个添加 future 的结果。

Once all futures are chained, we call join() on the aggregate CompletableFuture. This is done specifically for the example, so that we can retrieve the results and block the main Java thread from exiting before aggregate is finished. In a real asynchronous scenario we’d probably add our callback logic inside a thenAccept() or whenComplete() call.

一旦所有期货都链上了,我们就会在 aggregate CompletableFuture 上调用 join() 。这是专门为示例而做的,这样我们就可以检索结果,并阻止 Java 主线程在 aggregate 完成之前退出。在真实的异步场景中,我们可能会在 thenAccept()whenComplete() 调用中添加回调逻辑。

One thing to notice is we add a close() call at the end with the following implementation:

需要注意的是,我们在最后添加了一个 close() 调用,具体实现如下:

void close() {
    asyncOperationEmulation.shutdownNow();
}

Closing all Executors is mandatory when an application exits, otherwise, the Java process will hang.

应用程序退出时必须关闭所有执行器,否则 Java 进程将挂起

2.2. Implementation Problems

2.2.实施问题

The naïve implementation has a few problems. Not only the future chaining introduces unwanted complexity, but it also creates a large number of unneeded objects, such as all the new CompletableFutures generated by thenCompose().

不仅未来链引入了不必要的复杂性,而且还创建了大量不需要的对象,例如由 thenCompose() 生成的所有新 CompletableFutures 。

Another potential issue appears when we execute a large number of operations. If an operation fails, and depending on how the Java implementation resolves the CompletableFuture chain, we might get a StackOverflowError if the resolutions are done recursively.

当我们执行大量操作时,另一个潜在问题就会出现。如果操作失败,根据 Java 实现解决 CompletableFuture 链的方式,如果解决是递归进行的,我们可能会收到 StackOverflowError 异常。

To test the exception scenario we can introduce an error on one of the operations by changing the asyncOperation() method:

要测试异常情况,我们可以通过更改 asyncOperation() 方法,在其中一个操作上引入错误:

CompletableFuture<String> asyncOperation(String operationId) {
    CompletableFuture<String> cf = new CompletableFuture<>();
    asyncOperationEmulation.submit(() -> {
        if (operationId.endsWith("567")) {
            cf.completeExceptionally(new Exception("Error on operation " + operationId));
            return;
        }
        Thread.sleep(100);
        cf.complete(operationId);
    });
    return cf;
}

The future for the 567th operation will complete exceptionally in this case, making the aggregate.join() call also throw a runtime exception.

在这种情况下,第 567 次操作的未来将异常完成,这使得 aggregate.join() 调用也会抛出运行时异常。

3. Using CompletableFuture.allOf()

3.使用 CompletableFuture.allOf()

A different and better approach is to use the allOf() method of the CompletableFuture API. This method takes an array of CompletableFuture objects and creates a new one that resolves when all the provided futures themselves resolve.

另一种更好的方法是使用 CompletableFuture API 中的 allOf() 方法。该方法接收一个 CompletableFuture 对象数组,并创建一个新对象,该对象在所有提供的未来对象都解析后解析

Additionally, if one of the futures fails then the aggregate future also fails. The new future doesn’t contain the list of results. To obtain them we have to inspect the respective CompletableFuture object.

此外,如果其中一个未来失败,那么总体未来也会失败。新的未来不包含结果列表。要获得这些结果,我们必须检查相应的 CompletableFuture 对象

3.1. Implementation

3.1 实施

Let’s create a new start() method using allOf():

让我们使用 allOf() 创建一个新的 start() 方法:</em

void start() {
    List<CompletableFuture<String>> futures = new ArrayList<>();
    for (int i = 1; i <= 1000; i++) {
        String operationId = "Operation-" + i;
        futures.add(asyncOperation(operationId));
    }
    CompletableFuture<?>[] futuresArray = futures.toArray(new CompletableFuture<?>[0]);
    CompletableFuture<List<String>> listFuture = CompletableFuture.allOf(futuresArray)
      .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    final List<String> results = listFuture.join();
    System.out.println("Printing first 10 results");
    for (int i = 0; i < 10; i++) {
        System.out.println("Finished " + results.get(i));
    }

    close();
}

The setup and result printing are the same, however we now have a futuresArray and provide it to allOf(). We use thenApply() to add logic after allOf() is resolved. In this callback, we gather all futures results using the CompletableFuture.join() method and collect them into a List. This list is the result contained inside the CompletableFuture generated by thenApply(), namely the listFuture.

设置和结果打印都是一样的,但是我们现在有了一个 futuresArray 并将其提供给 allOf() 。我们使用 thenApply()allOf() 解析后添加逻辑。在此回调中,我们使用 CompletableFuture.join() 方法收集所有 Futures 结果,并将它们收集到 List 中。该列表是由 thenApply() 生成的 CompletableFuture 内部包含的结果,即 listFuture

To showcase the aggregate results we use the join() method which blocks the main thread until listFuture is complete. We shouldn’t forget the close() call at the end.

为了展示汇总结果,我们使用了 join() 方法,该方法会阻塞 main 线程,直到 listFuture 完成。我们不应忘记最后的 close() 调用。

3.2. Pros of allOf()

3.2.allOf() 的优点

The allOf() based implementation is a simpler and cleaner way of handling multiple asynchronous operations than future chaining. The aggregate CompletableFuture provides atomicity to the whole operation and completes when all futures succeed or fail when even one fails. This protects us from potential partial processing of results.

与未来链相比,基于 allOf() 的实现是一种更简单、更干净的处理多个异步操作的方法。聚合体 CompletableFuture 为整个操作提供了原子性,当所有未来操作成功时,聚合体完成;当其中一个未来操作失败时,聚合体失败。这可以防止我们对结果进行潜在的部分处理。

Additionally, it lets us wait for all futures to complete in a non-blocking manner. Notice that in the example code, we call join() for the listFuture object but in a realistic scenario we’d rely on just the callback.

此外,它还能让我们以非阻塞的方式等待所有期货完成。请注意,在示例代码中,我们为 listFuture 对象调用了 join() ,但在现实场景中,我们将只依赖回调。

4. Conclusion

4.结论

In this article, we learned how to convert a List<CompletableFuture<T>> into a CompletableFuture<List<T>>. We understood why this conversion is useful and saw two ways of doing it, one naïve implementation and one using the proper Java APIs. We discussed the potential issues with the former and how the latter avoids them.

在本文中,我们学习了如何将 List<CompletableFuture<T>> 转换为 CompletableFuture<List<T>>。我们理解了这种转换有用的原因,并看到了两种实现方法:一种是天真的实现,另一种是使用正确的 Java API。我们讨论了前者的潜在问题以及后者如何避免这些问题。

As always, the source code for this article is available over on GitHub.

与往常一样,本文的源代码可在 GitHub 上获取。