Guide To CompletableFuture – 指南:可完成的未来

最后修改: 2016年 8月 16日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

This tutorial is a guide to the functionality and use cases of the CompletableFuture class that was introduced as a Java 8 Concurrency API improvement.

本教程是关于CompletableFuture类的功能和使用案例的指南,该类是作为Java 8并发API改进而引入的。

2. Asynchronous Computation in Java

2.Java中的异步计算

Asynchronous computation is difficult to reason about. Usually we want to think of any computation as a series of steps, but in the case of asynchronous computation, actions represented as callbacks tend to be either scattered across the code or deeply nested inside each other. Things get even worse when we need to handle errors that might occur during one of the steps.

异步计算是很难推理的。通常我们想把任何计算看作是一系列的步骤,但在异步计算的情况下,以回调表示的操作往往分散在代码中,或者深深地嵌套在彼此之间。当我们需要处理在其中一个步骤中可能发生的错误时,情况会变得更加糟糕。

The Future interface was added in Java 5 to serve as a result of an asynchronous computation, but it did not have any methods to combine these computations or handle possible errors.

Future接口是在Java 5中添加的,作为异步计算的结果,但它没有任何方法来组合这些计算或处理可能的错误。

Java 8 introduced the CompletableFuture class. Along with the Future interface, it also implemented the CompletionStage interface. This interface defines the contract for an asynchronous computation step that we can combine with other steps.

Java 8引入了CompletableFuture类。除了Future接口之外,它还实现了CompletionStage接口。这个接口定义了一个异步计算步骤的契约,我们可以将其与其他步骤结合起来。

CompletableFuture is at the same time a building block and a framework, with about 50 different methods for composing, combining, and executing asynchronous computation steps and handling errors.

CompletableFuture同时也是一个构件和框架,它有大约50种不同的方法来组成、组合和执行异步计算步骤以及处理错误

Such a large API can be overwhelming, but these mostly fall in several clear and distinct use cases.

这么大的一个API可能会让人不知所措,但这些大多属于几个清晰而明确的用例。

3. Using CompletableFuture as a Simple Future

3.将CompletableFuture作为一个简单的Future

First of all, the CompletableFuture class implements the Future interface, so we can use it as a Future implementation, but with additional completion logic.

首先,CompletableFuture类实现了Future接口,所以我们可以将其作为Future的实现来使用,但要增加完成逻辑

For example, we can create an instance of this class with a no-arg constructor to represent some future result, hand it out to the consumers, and complete it at some time in the future using the complete method. The consumers may use the get method to block the current thread until this result is provided.

例如,我们可以用一个没有参数的构造函数来创建这个类的实例,以表示一些未来的结果,把它交给消费者,并在未来的某个时间用complete方法完成它。消费者可以使用get方法来阻塞当前线程,直到这个结果被提供。

In the example below, we have a method that creates a CompletableFuture instance, then spins off some computation in another thread and returns the Future immediately.

在下面的例子中,我们有一个方法创建了一个CompletableFuture实例,然后在另一个线程中进行一些计算并立即返回Future

When the computation is done, the method completes the Future by providing the result to the complete method:

当计算完成后,该方法通过向complete方法提供结果来完成Future

public Future<String> calculateAsync() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });

    return completableFuture;
}

To spin off the computation, we use the Executor API. This method of creating and completing a CompletableFuture can be used together with any concurrency mechanism or API, including raw threads.

为了分拆计算,我们使用Executor API。这种创建和完成CompletableFuture的方法可以与任何并发机制或API一起使用,包括原始线程。

Notice that the calculateAsync method returns a Future instance.

注意,calculateAsync方法返回一个Future实例

We simply call the method, receive the Future instance, and call the get method on it when we’re ready to block for the result.

我们只需调用该方法,接收Future实例,并在我们准备为结果进行阻塞时调用该方法的get

Also observe that the get method throws some checked exceptions, namely ExecutionException (encapsulating an exception that occurred during a computation) and InterruptedException (an exception signifying that a thread executing a method was interrupted):

还可以观察到,get方法抛出了一些被检查的异常,即ExecutionException(封装了一个在计算过程中发生的异常)和InterruptedException(标志着执行方法的线程被中断的一个异常)。

Future<String> completableFuture = calculateAsync();

// ... 

String result = completableFuture.get();
assertEquals("Hello", result);

If we already know the result of a computation, we can use the static completedFuture method with an argument that represents a result of this computation. Consequently, the get method of the Future will never block, immediately returning this result instead:

如果我们已经知道一个计算的结果,我们可以使用静态的completedFuture方法,其参数代表这个计算的结果。因此,Futureget方法将不会阻塞,而是立即返回这个结果。

Future<String> completableFuture = 
  CompletableFuture.completedFuture("Hello");

// ...

String result = completableFuture.get();
assertEquals("Hello", result);

As an alternative scenario, we may want to cancel the execution of a Future.

作为一种替代方案,我们可能想要取消一个Future的执行。

4. CompletableFuture with Encapsulated Computation Logic

4.带有封装计算逻辑的CompletableFuture

The code above allows us to pick any mechanism of concurrent execution, but what if we want to skip this boilerplate and simply execute some code asynchronously?

上面的代码允许我们选择任何并发执行的机制,但如果我们想跳过这个模板,简单地异步执行一些代码呢?

Static methods runAsync and supplyAsync allow us to create a CompletableFuture instance out of Runnable and Supplier functional types correspondingly.

静态方法runAsyncsupplyAsync允许我们从RunnableSupplier功能类型中相应地创建一个CompletableFuture实例。

Both Runnable and Supplier are functional interfaces that allow passing their instances as lambda expressions thanks to the new Java 8 feature.

RunnableSupplier都是功能性接口,由于Java 8的新特性,它们的实例可以作为lambda表达式传递。

The Runnable interface is the same old interface that is used in threads and it does not allow to return a value.

Runnable接口是用于线程的老接口,它不允许返回一个值。

The Supplier interface is a generic functional interface with a single method that has no arguments and returns a value of a parameterized type.

Supplier接口是一个通用功能接口,它有一个没有参数的方法,并返回一个参数化类型的值。

This allows us to provide an instance of the Supplier as a lambda expression that does the calculation and returns the result. It is as simple as:

这使得我们可以提供一个Supplier的实例,作为一个lambda表达式,进行计算并返回结果。它就像这样简单。

CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");

// ...

assertEquals("Hello", future.get());

5. Processing Results of Asynchronous Computations

5.处理异步计算的结果

The most generic way to process the result of a computation is to feed it to a function. The thenApply method does exactly that; it accepts a Function instance, uses it to process the result, and returns a Future that holds a value returned by a function:

处理计算结果的最通用方法是将其送入一个函数。thenApply方法正是这样做的;它接受一个Function实例,用它来处理结果,并返回一个Future,它持有一个由函数返回的值。

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");

assertEquals("Hello World", future.get());

If we don’t need to return a value down the Future chain, we can use an instance of the Consumer functional interface. Its single method takes a parameter and returns void.

如果我们不需要沿着Future链返回一个值,我们可以使用Consumer功能接口的一个实例。它的单一方法接受一个参数并返回void

There’s a method for this use case in the CompletableFuture. The thenAccept method receives a Consumer and passes it the result of the computation. Then the final future.get() call returns an instance of the Void type:

CompletableFuture中,有一个方法用于这个用例。thenAccept方法接收一个Consumer并将计算的结果传递给它。然后,最后的future.get()调用返回一个Void类型的实例。

CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));

future.get();

Finally, if we neither need the value of the computation, nor want to return some value at the end of the chain, then we can pass a Runnable lambda to the thenRun method. In the following example, we simply print a line in the console after calling the future.get():

最后,如果我们既不需要计算的值,也不想在链的末端返回一些值,那么我们可以将一个Runnable lambda传递给thenRun方法。在下面的例子中,我们只是在调用future.get()后在控制台中打印一行:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));

future.get();

6. Combining Futures

6.组合式期货

The best part of the CompletableFuture API is the ability to combine CompletableFuture instances in a chain of computation steps.

CompletableFuture API最好的部分是在计算步骤链中组合CompletableFuture实例的能力

The result of this chaining is itself a CompletableFuture that allows further chaining and combining. This approach is ubiquitous in functional languages and is often referred to as a monadic design pattern.

这种连锁的结果本身就是一个CompletableFuture,可以进一步连锁和组合。这种方法在函数式语言中无处不在,通常被称为单体设计模式。

In the following example we use the thenCompose method to chain two Futures sequentially.

在下面的例子中,我们使用thenCompose方法将两个Futures依次连锁。

Notice that this method takes a function that returns a CompletableFuture instance. The argument of this function is the result of the previous computation step. This allows us to use this value inside the next CompletableFuture‘s lambda:

请注意,这个方法需要一个函数来返回一个CompletableFuture实例。这个函数的参数是前一个计算步骤的结果。这使得我们可以在下一个CompletableFuture的lambda中使用这个值。

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

assertEquals("Hello World", completableFuture.get());

The thenCompose method, together with thenApply, implement basic building blocks of the monadic pattern. They closely relate to the map and flatMap methods of Stream and Optional classes also available in Java 8.

thenCompose方法与thenApply一起实现了单体模式的基本构建模块。它们与Java 8中的mapflatMap类的StreamOptional方法密切相关。

Both methods receive a function and apply it to the computation result, but the thenCompose (flatMap) method receives a function that returns another object of the same type. This functional structure allows composing the instances of these classes as building blocks.

这两个方法都接收一个函数并将其应用于计算结果,但是thenComposeflatMap)方法接收一个返回另一个相同类型对象的函数。这种功能结构允许将这些类的实例作为构建块进行组合。

If we want to execute two independent Futures and do something with their results, we can use the thenCombine method that accepts a Future and a Function with two arguments to process both results:

如果我们想执行两个独立的Futures并对其结果进行处理,我们可以使用thenCombine方法,该方法接受一个Future和一个有两个参数的Function来处理两个结果。

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2));

assertEquals("Hello World", completableFuture.get());

A simpler case is when we want to do something with two Futures‘ results, but don’t need to pass any resulting value down a Future chain. The thenAcceptBoth method is there to help:

一个更简单的情况是,当我们想对两个Futures的结果做一些事情,但不需要将任何结果值传递到Future链。thenAcceptBoth方法就是为了帮助我们。

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));

7. Difference Between thenApply() and thenCompose()

7.thenApply()thenCompose()之间的区别

In our previous sections, we’ve shown examples regarding thenApply() and thenCompose(). Both APIs help chain different CompletableFuture calls, but the usage of these 2 functions is different.

在我们之前的章节中,我们已经展示了关于thenApply()thenCompose()的例子。这两个API都有助于连锁不同的CompletableFuture调用,但这两个函数的用法是不同的。

7.1. thenApply()

7.1.thenApply()

We can use this method to work with a result of the previous call. However, a key point to remember is that the return type will be combined of all calls.

我们可以使用这个方法来处理之前调用的结果。然而,需要记住的一个关键点是,返回类型将是所有调用的组合。

So this method is useful when we want to transform the result of a CompletableFuture call:

因此,当我们想要转换一个CompletableFuture调用的结果时,这个方法很有用。

CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);

7.2. thenCompose()

7.2.thenCompose()

The thenCompose() method is similar to thenApply() in that both return a new Completion Stage. However, thenCompose() uses the previous stage as the argument. It will flatten and return a Future with the result directly, rather than a nested future as we observed in thenApply():

thenCompose()方法与thenApply()类似,都返回一个新的完成阶段。然而,thenCompose()使用前一个阶段作为参数。它将扁平化并直接返回一个带有结果的Future,而不是像我们在thenApply()中观察到的那样,返回一个嵌套的future:

CompletableFuture<Integer> computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);

So if the idea is to chain CompletableFuture methods then it’s better to use thenCompose().

因此,如果我们的想法是将CompletableFuture方法连锁,那么最好使用thenCompose()

Also, note that the difference between these two methods is analogous to the difference between map() and flatMap().

另外,请注意,这两个方法的区别类似于map()flatMap()的区别。

8. Running Multiple Futures in Parallel

8.并行运行多个期货

When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.

当我们需要并行执行多个Futures时,我们通常希望等待所有的执行,然后处理它们的组合结果。

The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:

CompletableFuture.allOf静态方法允许等待作为var-arg提供的所有Futures完成。

CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);

// ...

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture<Void>. The limitation of this method is that it does not return the combined results of all Futures. Instead, we have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:

注意,CompletableFuture.allOf()的返回类型是一个CompletableFuture<Void>。这个方法的局限性在于,它不能返回所有Futures的综合结果。相反,我们必须手动从Futures中获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API使之变得简单。

String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);

The CompletableFuture.join() method is similar to the get method, but it throws an unchecked exception in case the Future does not complete normally. This makes it possible to use it as a method reference in the Stream.map() method.

CompletableFuture.join()方法与get方法类似,但是它在Future没有正常完成的情况下抛出一个未检查的异常。这使得我们可以在Stream.map()方法中把它作为一个方法引用。

9. Handling Errors

9.处理错误

For error handling in a chain of asynchronous computation steps, we have to adapt the throw/catch idiom in a similar fashion.

对于异步计算步骤链中的错误处理,我们必须以类似的方式调整throw/catch习语。

Instead of catching an exception in a syntactic block, the CompletableFuture class allows us to handle it in a special handle method. This method receives two parameters: a result of a computation (if it finished successfully), and the exception thrown (if some computation step did not complete normally).

CompletableFuture类允许我们在一个特殊的handle方法中处理异常,而不是在一个语法块中捕捉异常。这个方法接收两个参数:一个是计算的结果(如果成功完成),另一个是抛出的异常(如果某个计算步骤没有正常完成)。

In the following example, we use the handle method to provide a default value when the asynchronous computation of a greeting was finished with an error because no name was provided:

在下面的例子中,我们使用handle方法来提供一个默认值,当一个问候语的异步计算因为没有提供名字而以错误结束时。

String name = null;

// ...

CompletableFuture<String> completableFuture  
  =  CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, " + name;
  }).handle((s, t) -> s != null ? s : "Hello, Stranger!");

assertEquals("Hello, Stranger!", completableFuture.get());

As an alternative scenario, suppose we want to manually complete the Future with a value, as in the first example, but also have the ability to complete it with an exception. The completeExceptionally method is intended for just that. The completableFuture.get() method in the following example throws an ExecutionException with a RuntimeException as its cause:

作为另一种情况,假设我们想用一个值来手动完成Future,就像第一个例子中那样,但也有能力用一个异常来完成它。completeExceptionally方法就是为了这个目的。下面的例子中的completableFuture.get()方法抛出了一个ExecutionException,其原因是RuntimeException

CompletableFuture<String> completableFuture = new CompletableFuture<>();

// ...

completableFuture.completeExceptionally(
  new RuntimeException("Calculation failed!"));

// ...

completableFuture.get(); // ExecutionException

In the example above, we could have handled the exception with the handle method asynchronously, but with the get method we can use the more typical approach of a synchronous exception processing.

在上面的例子中,我们可以用handle方法异步地处理异常,但是通过get方法,我们可以使用更典型的同步异常处理方法。

10. Async Methods

10.异步方法

Most methods of the fluent API in CompletableFuture class have two additional variants with the Async postfix. These methods are usually intended for running a corresponding step of execution in another thread.

CompletableFuture类中的大多数流畅API方法都有两个带有Async后缀的附加变体。这些方法通常是为了在另一个线程中运行相应的执行步骤

The methods without the Async postfix run the next execution stage using a calling thread. In contrast, the Async method without the Executor argument runs a step using the common fork/join pool implementation of Executor that is accessed with the ForkJoinPool.commonPool() method. Finally, the Async method with an Executor argument runs a step using the passed Executor.

没有Async后缀的方法使用一个调用线程运行下一个执行阶段。相比之下,没有Async参数的Async方法使用Fork/join池的公共Executor实现来运行一个步骤,该池可通过ForkJoinPool.commonPool()方法访问。最后,带有Executor参数的Async方法使用传递的Executor运行一个步骤。

Here’s a modified example that processes the result of a computation with a Function instance. The only visible difference is the thenApplyAsync method, but under the hood the application of a function is wrapped into a ForkJoinTask instance (for more information on the fork/join framework, see the article “Guide to the Fork/Join Framework in Java”). This allows us to parallelize our computation even more and use system resources more efficiently:

这里有一个修改过的例子,它用一个Function实例来处理计算的结果。唯一可见的区别是thenApplyAsync方法,但在引擎盖下,函数的应用被包装成一个ForkJoinTask实例(关于fork/join框架的更多信息,请参阅“Java中Fork/Join框架指南”)。这使得我们可以将我们的计算更加并行化,并更有效地使用系统资源。

CompletableFuture<String> completableFuture  
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApplyAsync(s -> s + " World");

assertEquals("Hello World", future.get());

11. JDK 9 CompletableFuture API

11.JDK 9 CompletableFuture API

Java 9 enhances the CompletableFuture API with the following changes:

Java 9通过以下变化增强了CompletableFuture API。

  • New factory methods added
  • Support for delays and timeouts
  • Improved support for subclassing

and new instance APIs:

和新的实例API。

  • Executor defaultExecutor()
  • CompletableFuture<U> newIncompleteFuture()
  • CompletableFuture<T> copy()
  • CompletionStage<T> minimalCompletionStage()
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)
  • CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)

We also now have a few static utility methods:

我们现在也有一些静态的实用方法。

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • <U> CompletionStage<U> completedStage(U value)
  • <U> CompletionStage<U> failedStage(Throwable ex)
  • <U> CompletableFuture<U> failedFuture(Throwable ex)

Finally, to address timeout, Java 9 has introduced two more new functions:

最后,为了解决超时问题,Java 9又引入了两个新函数。

  • orTimeout()
  • completeOnTimeout()

Here’s the detailed article for further reading: Java 9 CompletableFuture API Improvements.

这里有详细的文章供进一步阅读。Java 9 CompletableFuture API改进

12. Conclusion

12.结论

In this article, we’ve described the methods and typical use cases of the CompletableFuture class.

在这篇文章中,我们描述了CompletableFuture类的方法和典型用例。

The source code for the article is available over on GitHub.

该文章的源代码可在GitHub上获得