1. Introduction
1.绪论
Guava provides us with ListenableFuture with an enriched API over the default Java Future. Let’s see how we can use this to our advantage.
Guava为我们提供了ListenableFuture,与默认的JavaFuture相比,它具有丰富的API。让我们来看看我们如何利用这个优势。
2. Future, ListenableFuture and Futures
2.Future, ListenableFuture和Futures
Let’s have a brief look at what these different classes are and how they are related to each other.
让我们简单看一下这些不同的类别是什么,以及它们之间的关系如何。
2.1. Future
2.1.未来
Since Java 5, we can use java.util.concurrent.Future to represent asynchronous tasks.
从Java 5开始,我们可以使用java.util.concurrent.Future来表示异步任务。
A Future allows us access to the result of a task that has already been completed or might complete in the future, along with support for canceling them.
一个Future允许我们访问已经完成或未来可能完成的任务的结果,同时支持取消它们。
2.2. ListenableFuture
2.2.ListenableFuture
One lacking feature when using java.util.concurrent.Future is the ability to add listeners to run on completion, which is a common feature provided by most popular asynchronous frameworks.
在使用java.util.concurrent.Future时,缺少的一个功能是添加监听器以在完成时运行,这是大多数流行的异步框架所提供的一个常见功能。
Guava solves this problem by allowing us to attach listeners to its com.google.common.util.concurrent.ListenableFuture.
Guava通过允许我们在其com.google.common.util.concurrent.ListenableFuture.上附加监听器来解决这个问题。
2.3. Futures
2.3.期货
Guava provides us with the convenience class com.google.common.util.concurrent.Futures to make it easier to work with their ListenableFuture.
Guava为我们提供了方便的类com.google.common.util.concurrent.Futures,以方便我们使用他们的ListenableFuture。
This class provides various ways of interacting with ListenableFuture, among which is the support for adding success/failure callbacks and allowing us to coordinate multiple futures with aggregations or transformations.
该类提供了与ListenableFuture互动的各种方式,其中包括支持添加成功/失败的回调,并允许我们用聚合或转换来协调多个期货。
3. Simple Usage
3.简单使用
Let’s now see how we can use ListenableFuture in its simplest ways; creating and adding callbacks.
现在让我们看看我们如何以最简单的方式使用ListenableFuture;创建和添加回调。
3.1. Creating ListenableFuture
3.1.创建ListenableFuture
The simplest way we can obtain a ListenableFuture is by submitting a task to a ListeningExecutorService (much like how we would use a normal ExecutorService to obtain a normal Future):
我们获得ListenableFuture的最简单方法是向ListeningExecutorService提交一个任务(就像我们使用普通的ExecutorService来获得普通的Future)。
ExecutorService execService = Executors.newSingleThreadExecutor();
ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService);
ListenableFuture<Integer> asyncTask = lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500); // long running task
return 5;
});
Notice how we use the MoreExecutors class to decorate our ExecutorService as a ListeningExecutorService. We can refer to Thread Pool’s Implementation in Guava to learn more about MoreExecutors.
请注意我们是如何使用MoreExecutors类将我们的ExecutorService装饰成ListeningExecutorService。我们可以参考Guava中线程池的实现,以了解更多关于MoreExecutors的信息。
If we already have an API that returns a Future and we need to convert it to ListenableFuture, this is easily done by initializing its concrete implementation ListenableFutureTask:
如果我们已经有一个返回Future的API,并且我们需要将其转换为ListenableFuture,这很容易做到通过初始化其具体实现ListenableFutureTask:。
// old api
public FutureTask<String> fetchConfigTask(String configKey) {
return new FutureTask<>(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}
// new api
public ListenableFutureTask<String> fetchConfigListenableTask(String configKey) {
return ListenableFutureTask.create(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}
We need to be aware that these tasks won’t run unless we submit them to an Executor. Interacting directly with ListenableFutureTask is not common usage and is done only in rare scenarios (ex: implementing our own ExecutorService). Refer to Guava’s AbstractListeningExecutorService for practical usage.
我们需要注意的是,除非我们将这些任务提交给Executor,否则这些任务将不会运行。直接与ListenableFutureTask交互不是常见的用法,只有在极少数情况下才会这样做(例如:实现我们自己的ExecutorService)。请参考Guava的AbstractListeningExecutorService的实际用法。
We can also use com.google.common.util.concurrent.SettableFuture if our asynchronous task can’t use the ListeningExecutorService or the provided Futures utility methods, and we need to set the future value manually. For more complex usage, we can also consider com.google.common.util.concurrent.AbstractFuture.
如果我们的异步任务不能使用ListeningExecutorService或提供的Futures实用方法,我们也可以使用com.google.com.util.concurrent.SettableFuture,并且需要手动设置未来值。对于更复杂的用法,我们也可以考虑com.google.common.util.concurrent.AbstractFuture.。
3.2. Adding Listeners/Callbacks
3.2 添加监听器/回调器
One way we can add a listener to a ListenableFuture is by registering a callback with Futures.addCallback(), providing us access to the result or exception when success or failure occurs:
我们可以为ListenableFuture添加一个监听器,方法是用Futures.addCallback()注册一个回调,在成功或失败时为我们提供对结果或异常的访问:。
Executor listeningExecutor = Executors.newSingleThreadExecutor();
ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
Futures.addCallback(asyncTask, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
// do on success
}
@Override
public void onFailure(Throwable t) {
// do on failure
}
}, listeningExecutor);
We can also add a listener by adding it directly to the ListenableFuture. Note that this listener will run when the future completes either successfully or exceptionally. Also, note that we don’t have access to the result of the asynchronous task:
我们也可以直接向ListenableFuture.添加一个监听器,注意这个监听器将在future成功或异常完成时运行。另外,请注意,我们无法访问异步任务的结果。
Executor listeningExecutor = Executors.newSingleThreadExecutor();
int nextTask = 1;
Set<Integer> runningTasks = ConcurrentHashMap.newKeySet();
runningTasks.add(nextTask);
ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
asyncTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor);
4. Complex Usage
4.复杂用途
Let’s now see how we can use these futures in more complex scenarios.
现在让我们看看如何在更复杂的情况下使用这些期货。
4.1. Fan-In
4.1 扇入
We may sometimes need to invoke multiple asynchronous tasks and collect their results, usually called a fan-in operation.
我们有时可能需要调用多个异步任务并收集其结果,通常称为扇入操作。
Guava provides us with two ways of doing this. However, we should be careful in selecting the correct method depending on our requirements. Let’s assume we need to coordinate the following asynchronous tasks:
Guava为我们提供了两种方法来做到这一点。然而,我们应该根据我们的要求,谨慎选择正确的方法。让我们假设我们需要协调以下异步任务。
ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.fetchConfig("config.1");
ListenableFuture<String> task3 = service.fetchConfig("config.2");
One way of fanning-in multiple futures is by the use of Futures.allAsList() method. This allows us to collect results of all futures if all of them succeed, in the order of the provided futures. If either one of these futures fails, then the whole result is a failed future:
一种将多个期货纳入的方法是使用Futures.allAsList()方法。这允许我们收集所有期货的结果,如果所有的期货都成功的话,按照提供的期货的顺序。如果其中任何一个期货失败,那么整个结果就是一个失败的期货。
ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> configResults) {
// do on all futures success
}
@Override
public void onFailure(Throwable t) {
// handle on at least one failure
}
}, someExecutor);
If we need to collect results of all asynchronous tasks, regardless of whether they failed or not, we can use Futures.successfulAsList(). This will return a list whose results will have the same order as the tasks passed into the argument, and the failed tasks will have null assigned to their respective positions in the list:
如果我们需要收集所有异步任务的结果,无论它们是否失败,我们可以使用Futures.successfulAsList()。这将返回一个列表,其结果的顺序与传入参数的任务相同,而失败的任务将被分配到列表中各自的位置,即null。
ListenableFuture<List<String>> configsTask = Futures.successfulAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> configResults) {
// handle results. If task2 failed, then configResults.get(1) == null
}
@Override
public void onFailure(Throwable t) {
// handle failure
}
}, listeningExecutor);
We should be careful in the above usage that if the future task normally returns null on success, it will be indistinguishable from a failed task (which also sets the result as null).
在上述用法中,我们应该小心,如果未来任务在成功时通常返回null,它将与失败的任务(它也将结果设置为null)无法区分。
4.2. Fan-In with Combiners
4.2.带合路器的扇入式
If we have a requirement to coordinate multiple futures that return different results, the above solution may not suffice. In this case, we can use the combiner variants of the fan-in operations to coordinate this mix of futures.
如果我们有协调多个返回不同结果的期货的要求,上述解决方案可能就不够了。在这种情况下,我们可以使用扇入操作的组合器变体来协调这种混合的期货。
Similar to the simple fan-in operations, Guava provides us with two variants; one that succeeds when all tasks complete successfully and one that succeeds even if some tasks fail using the Futures.whenAllSucceed() and Futures.whenAllComplete() methods, respectively.
与简单的扇入操作类似,Guava为我们提供了两种变体;一种是在所有任务成功完成时成功,另一种是在某些任务失败时也成功,分别使用Futures.whenAllSucceed()和Futures.whenAllComplete()方法。
Let’s see how we can use Futures.whenAllSucceed() to combine different results types from multiple futures:
让我们看看我们如何使用Futures.whenAllSucceed()来结合来自多个期货的不同结果类型。
ListenableFuture<Integer> cartIdTask = service.getCartId();
ListenableFuture<String> customerNameTask = service.getCustomerName();
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();
ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask)
.call(() -> {
int cartId = Futures.getDone(cartIdTask);
String customerName = Futures.getDone(customerNameTask);
List<String> cartItems = Futures.getDone(cartItemsTask);
return new CartInfo(cartId, customerName, cartItems);
}, someExecutor);
Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() {
@Override
public void onSuccess(@Nullable CartInfo result) {
//handle on all success and combination success
}
@Override
public void onFailure(Throwable t) {
//handle on either task fail or combination failed
}
}, listeningExecService);
If we need to allow some tasks to fail, we can use Futures.whenAllComplete(). While the semantics are mostly similar to the above, we should be aware that the failed futures will throw an ExecutionException when Futures.getDone() is called on them.
如果我们需要允许一些任务失败,我们可以使用Futures.whenAllComplete()。虽然语义大多与上述类似,但我们应该注意,当Futures.getDone()被调用时,失败的期货将抛出ExecutionException。
4.3. Transformations
4.3.变革
Sometimes we need to convert the result of a future once successful. Guava provides us with two ways to do so with Futures.transform() and Futures.lazyTransform().
有时我们需要在成功后转换一个future的结果。Guava为我们提供了两种方法:Futures.transform()和Futures.lazyTransform()。
Let’s see how we can use Futures.transform() to transform the result of a future. This can be used as long as the transformation computation is not heavy:
让我们看看我们如何使用Futures.transform()来转换一个future的结果。只要转换的计算量不大,就可以使用这个方法:。
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();
Function<List<String>, Integer> itemCountFunc = cartItems -> {
assertNotNull(cartItems);
return cartItems.size();
};
ListenableFuture<Integer> itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService);
We can also use Futures.lazyTransform() to apply a transformation function to a java.util.concurrent.Future. We need to keep in mind that this option doesn’t return a ListenableFuture but a normal java.util.concurrent.Future and that the transformation function applies every time get() is invoked on the resulting future.
我们也可以使用Futures.lazyTransform()来对java.util.concurrent.Future.应用转换函数。我们需要记住,这个选项并不返回ListenableFuture,而是一个普通的java.util.concurrent.Future,并且转换函数在每次get()被调用的时候都会应用在产生的未来。
4.4. Chaining Futures
4.4.连锁期货
We may come across situations where our futures need to call other futures. In such cases, Guava provides us with async() variants to safely chain these futures to execute one after the other.
我们可能会遇到我们的期货需要调用其他期货的情况。在这种情况下,Guava为我们提供了async()变体来安全地连锁这些期货,使其一个接一个地执行。
Let’s see how we can use Futures.submitAsync() to call a future from inside the Callable that is submitted:
让我们看看我们如何使用Futures.submitAsync()从被提交的Callable内部调用一个未来:。
AsyncCallable<String> asyncConfigTask = () -> {
ListenableFuture<String> configTask = service.fetchConfig("config.a");
TimeUnit.MILLISECONDS.sleep(500); //some long running task
return configTask;
};
ListenableFuture<String> configTask = Futures.submitAsync(asyncConfigTask, executor);
In case we want true chaining, where the result of one future is fed into the computation of another future, we can use Futures.transformAsync():
如果我们想要真正的连锁,即一个未来的结果被送入另一个未来的计算中,我们可以使用Futures.transformAsync():。
ListenableFuture<String> usernameTask = service.generateUsername("john");
AsyncFunction<String, String> passwordFunc = username -> {
ListenableFuture<String> generatePasswordTask = service.generatePassword(username);
TimeUnit.MILLISECONDS.sleep(500); // some long running task
return generatePasswordTask;
};
ListenableFuture<String> passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor);
Guava also provides us with Futures.scheduleAsync() and Futures.catchingAsync() to submit a scheduled task and to provide fallback tasks on error recovery, respectively. While they cater to different scenarios, we won’t discuss them since they are similar to the other async() calls.
Guava还为我们提供了Futures.scheduleAsync()和Futures.catchingAsync(),分别用于提交计划任务和提供错误恢复的回退任务。虽然它们迎合了不同的场景,但我们不会讨论它们,因为它们与其他async()调用类似。
5. Usage Dos and Don’ts
5.使用方法和注意事项
Let’s now investigate some common pitfalls we may encounter when working with futures and how to avoid them.
现在让我们调查一下我们在与期货合作时可能遇到的一些常见陷阱,以及如何避免这些陷阱。
5.1. Working vs. Listening Executors
5.1.工作与倾听的执行者
It is important to understand the difference between the working executor and the listening executor when using Guava futures. For example, let’s say we have an asynchronous task to fetch configs:
在使用Guava期货时,理解工作执行器和监听执行器之间的区别很重要。例如,假设我们有一个异步任务来获取配置。
public ListenableFuture<String> fetchConfig(String configKey) {
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}
Let’s also say that we want to attach a listener to the above future:
我们还可以说,我们想给上述的未来附加一个听众。
ListenableFuture<String> configsTask = service.fetchConfig("config.0");
Futures.addCallback(configsTask, someListener, listeningExecutor);
Notice that the lExecService here is the executor that is running our asynchronous task, while the listeningExecutor is the executor on which our listener is invoked.
请注意,这里的lExecService是运行我们的异步任务的执行器,而listeningExecutor是我们的监听器被调用的执行器。
As seen above, we should always consider separating these two executors to avoid scenarios where our listeners and workers are competing for the same thread pool resources. Sharing the same executor may cause our heavy-duty tasks to starve the listener executions. Or a badly written heavyweight listener ends up blocking our important heavy-duty tasks.
如上所述,我们应始终考虑分离这两个执行器,以避免出现听众和工作者争夺同一线程池资源的情况。共享同一执行器可能会导致我们的重任任务饿死听众的执行。或者一个写得不好的重量级监听器最终会阻塞我们重要的重量级任务。
5.2. Be Careful With directExecutor()
5.2.小心使用directExecutor()
While we can use MoreExecutors.directExecutor() and MoreExecutors.newDirectExecutorService() in unit testing to make it easier to handle asynchronous executions, we should be careful using them in production code.
虽然我们可以在单元测试中使用MoreExecutors.directExecutor()和MoreExecutors.newDirectExecutorService(),以便更容易处理异步执行,但在生产代码中使用它们应该小心。
When we obtain executors from the above methods, any tasks that we submit to it, be it heavyweight or listeners, will be executed on the current thread. This can be dangerous if the current execution context is one that requires high throughput.
当我们从上述方法中获得执行器时,我们提交给它的任何任务,无论是重量级还是监听器,都将在当前线程上执行。如果当前的执行环境是一个需要高吞吐量的环境,这可能是危险的。
For example, using a directExecutor and submitting a heavyweight task to it in the UI thread will automatically block our UI thread.
例如,使用一个directExecutor并在UI线程中向其提交一个重量级任务,将自动阻塞我们的UI线程。
We could also face a scenario where our listener ends up slowing down all our other listeners (even the ones that aren’t involved with directExecutor). This is because Guava executes all listeners in a while loop in their respective Executors, but the directExecutor will cause the listener to run in the same thread as the while loop.
我们还可能面临这样一种情况:我们的监听器最终会拖慢我们所有的其他监听器(甚至是那些没有参与directExecutor的监听器)。这是因为Guava在各自的执行器中以while循环的方式执行所有监听器,但directExecutor会导致监听器与while循环在同一线程中运行。
5.3. Nesting Futures Is Bad
5.3.嵌套期货是不好的
When working with chained futures, we should be careful not to call one from inside another future in such a way that it creates nested futures:
当使用链式期货时,我们应该注意不要从另一个期货内部调用一个期货,以免产生嵌套期货。
public ListenableFuture<String> generatePassword(String username) {
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return username + "123";
});
}
String firstName = "john";
ListenableFuture<ListenableFuture<String>> badTask = lExecService.submit(() -> {
final String username = firstName.replaceAll("[^a-zA-Z]+", "")
.concat("@service.com");
return generatePassword(username);
});
If we ever see code that has ListenableFuture<ListenableFuture<V>>, then we should know that this is a badly written future because there is a chance that cancellation and completion of the outer future may race, and the cancellation may not propagate to the inner future.
如果我们曾经看到有ListenableFuture<ListenableFuture<V>>的代码,那么我们应该知道这是一个写得很糟糕的未来,因为外部未来的取消和完成有可能发生竞争,而且取消可能不会传播到内部未来。
If we see the above scenario, we should always use the Futures.async() variants to safely unwrap these chained futures in a connected fashion.
如果我们看到上述情况,我们应该始终使用Futures.async() 变体来安全地以连接的方式解开这些链式期货。
5.4. Be Careful With JdkFutureAdapters.listenInPoolThread()
5.4.对JdkFutureAdapters.listenInPoolThread()要小心谨慎
Guava recommends that the best way we can leverage its ListenableFuture is by converting all our code that uses Future to ListenableFuture.
Guava建议我们利用其ListenableFuture的最佳方式是将我们所有使用Future的代码转换为ListenableFuture。
If this conversion is not feasible in some scenarios, Guava provides us with adapters to do this using the JdkFutureAdapters.listenInPoolThread() overrides. While this may seem helpful, Guava warns us that these are heavyweight adapters and should be avoided where possible.
如果这种转换在某些情况下不可行,Guava为我们提供了使用JdkFutureAdapters.listenInPoolThread()重写的适配器来做到这一点。虽然这看起来很有帮助,但Guava警告我们,这些是重量级的适配器,应尽可能避免使用。
6. Conclusion
6.结语
In this article, we have seen how we can use Guava’s ListenableFuture to enrich our usage of futures and how to use the Futures API to make it easier to work with these futures.
在这篇文章中,我们看到了如何使用Guava的ListenableFuture来丰富我们对期货的使用,以及如何使用FuturesAPI来使我们更容易处理这些期货。
We have also seen some common errors that we may make when working with these futures and the provided executors.
我们也看到了一些常见的错误,我们在使用这些期货和提供的执行者时可能会犯的错误。
As always, the full source code with our examples is available over on GitHub.
一如既往,带有我们示例的完整源代码可在GitHub上获得over。