1. Introduction
1.介绍
In this tutorial, we look at JAX-RS support for reactive (Rx) programming using the Jersey API. This article assumes the reader has knowledge of the Jersey REST client API.
在本教程中,我们看看Jersey API对反应式(Rx)编程的JAX-RS支持。本文假设读者对Jersey REST客户端API有所了解。
Some familiarity with reactive programming concepts will be helpful but isn’t necessary.
熟悉反应式编程概念会有帮助,但不是必须的。
2. Dependencies
2.依赖性
First, we need the standard Jersey client library dependencies:
首先,我们需要标准的Jersey客户端库依赖。
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
<version>2.27</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>2.27</version>
</dependency>
These dependencies give us stock JAX-RS reactive programming support. The current versions of jersey-client and jersey-hk2 are available on Maven Central.
这些依赖性为我们提供了库存的JAX-RS反应式编程支持。jersey-client和jersey-hk2的当前版本可在Maven Central找到。
For third-party reactive framework support, we’ll use these extensions:
对于第三方反应式框架的支持,我们将使用这些扩展。
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client-rxjava</artifactId>
<version>2.27</version>
</dependency>
The dependency above provides support for RxJava’s Observable; for the newer RxJava2’s Flowable, we use the following extension:
上面的依赖关系为RxJava的Observable提供支持;对于较新的RxJava2的Flowable,我们使用以下扩展。
<dependency>
<groupId>org.glassfish.jersey.ext.rx</groupId>
<artifactId>jersey-rx-client-rxjava2</artifactId>
<version>2.27</version>
</dependency>
The dependencies to rxjava and rxjava2 are also available on Maven Central.
rxjava和rxjava2的依赖项也可在Maven中心找到。
3. Why We Need Reactive JAX-RS Clients
3.为什么我们需要反应式的JAX-RS客户端?
Let’s say we have three REST APIs to consume:
假设我们有三个REST API需要消费。
- the id-service provides a list of Long user IDs
- the name-service provides a username for a given user ID
- the hash-service will return a hash of both the user ID and the username
We create a client for each of the services:
我们为每项服务创建一个客户。
Client client = ClientBuilder.newClient();
WebTarget userIdService = client.target("http://localhost:8080/id-service/ids");
WebTarget nameService
= client.target("http://localhost:8080/name-service/users/{userId}/name");
WebTarget hashService = client.target("http://localhost:8080/hash-service/{rawValue}");
This is a contrived example but it works for the purpose of our illustration. The JAX-RS specification supports at least three approaches for consuming these services together:
这是一个特意设计的例子,但对于我们的说明来说,它是有效的。JAX-RS 规范至少支持三种方法来一起消费这些服务。
- Synchronous (blocking)
- Asynchronous (non-blocking)
- Reactive (functional, non-blocking)
3.1. The Problem With Synchronous Jersey Client Invocation
3.1.同步Jersey客户端调用的问题
The vanilla approach to consuming these services will see us consuming the id-service to get the user IDs, and then calling the name-service and hash-service APIs sequentially for each ID returned.
消费这些服务的普通方法是,我们消费id-service来获取用户ID,然后针对返回的每个ID依次调用name-service和hash-service API。
With this approach, each call blocks the running thread until the request is fulfilled, spending a lot of time in total to fulfill the combined request. This is clearly less than satisfactory in any non-trivial use case.
采用这种方法,每次调用都会阻塞正在运行的线程,直到请求被满足为止,总共要花费大量时间来满足综合请求。在任何非微不足道的用例中,这显然是不令人满意的。
3.2. The Problem With Asynchronous Jersey Client Invocation
3.2.异步Jersey客户端调用的问题
A more sophisticated approach is to use the InvocationCallback mechanism supported by JAX-RS. At its most basic form, we pass a callback to the get method to define what happens when the given API call completes.
更复杂的方法是使用 JAX-RS 支持的 InvocationCallback 机制。在其最基本的形式下,我们向get方法传递一个回调,以定义在给定的API调用完成后会发生什么。
While we now get true asynchronous execution (with some limitations on thread efficiency), it’s easy to see how this style of code can get unreadable and unwieldy in anything but trivial scenarios. The JAX-RS specification specifically highlights this scenario as the Pyramid of Doom:
虽然我们现在得到了真正的异步执行(对线程效率有一些限制),但很容易看到这种风格的代码在任何琐碎的场景中都会变得难以阅读和难以操作。JAX-RS 规范特别强调了这种情况,即末日金字塔。
// used to keep track of the progress of the subsequent calls
CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size());
userIdService.request()
.accept(MediaType.APPLICATION_JSON)
.async()
.get(new InvocationCallback<List<Long>>() {
@Override
public void completed(List<Long> employeeIds) {
employeeIds.forEach((id) -> {
// for each employee ID, get the name
nameService.resolveTemplate("userId", id).request()
.async()
.get(new InvocationCallback<String>() {
@Override
public void completed(String response) {
hashService.resolveTemplate("rawValue", response + id).request()
.async()
.get(new InvocationCallback<String>() {
@Override
public void completed(String response) {
//complete the business logic
}
// ommitted implementation of the failed() method
});
}
// omitted implementation of the failed() method
});
});
}
// omitted implementation of the failed() method
});
// wait for inner requests to complete in 10 seconds
if (!completionTracker.await(10, TimeUnit.SECONDS)) {
logger.warn("Some requests didn't complete within the timeout");
}
So we’ve achieved asynchronous, time-efficient code, but:
所以我们已经实现了异步的、有时间效率的代码,但是。
- it’s difficult to read
- each call spawns a new thread
Note that we’re using a CountDownLatch in all code examples in order to wait for all expected values to be delivered by the hash-service. We do this so that we can assert that the code works in a unit test by checking that all expected values actually were delivered.
请注意,我们在所有代码示例中都使用了CountDownLatch,以便等待hash服务交付所有预期值。我们这样做是为了在单元测试中,通过检查所有预期值是否真的被传递,来断定代码是否工作。
A usual client would not wait, but rather do whatever should be done with the result within the callback in order not to block the thread.
通常的客户端不会等待,而是在回调中做任何应该做的事情,以便不阻塞线程。
3.3. The Functional, Reactive Solution
3.3.功能性、反应性解决方案
A functional and reactive approach will give us:
功能性和反应性的方法将给我们带来。
- Great code readability
- Fluent coding style
- Effective thread management
JAX-RS supports these objectives in the following components:
JAX-RS在以下组件中支持这些目标。
- CompletionStageRxInvoker supports the CompletionStage interface as the default reactive component
- RxObservableInvokerProvider supports RxJava’s Observable
- RxFlowableInvokerProvider support RxJava’s Flowable
There is also an API for adding support for other Reactive libraries.
还有一个API,用于添加对其他Reactive库的支持。
4. JAX-RS Reactive Component Support
4.JAX-RS反应式组件支持
4.1. CompletionStage in JAX-RS
4.1.JAX-RS中的完成阶段
Using the CompletionStage and its concrete implementation – CompletableFuture – we can write an elegant, non-blocking and fluent service call orchestration.
使用CompletionStage及其具体实现–CompletableFuture—我们可以编写一个优雅、非阻塞且流畅的服务调用协调。
Let’s start by retrieving the user IDs:
让我们从检索用户ID开始。
CompletionStage<List<Long>> userIdStage = userIdService.request()
.accept(MediaType.APPLICATION_JSON)
.rx()
.get(new GenericType<List<Long>>() {
}).exceptionally((throwable) -> {
logger.warn("An error has occurred");
return null;
});
The rx() method call is the point from which the reactive handling kicks in. We use the exceptionally function to fluently define our exception handling scenario.
rx()方法调用是反应式处理的起点。我们使用exceptionally函数来流畅地定义我们的异常处理场景。
From here, we can cleanly orchestrate the calls to retrieve the username from the Name service and then hash the combination of both the name and user ID:
从这里,我们可以干净地协调调用,从Name服务中获取用户名,然后对名字和用户ID的组合进行散列。
List<String> expectedHashValues = ...;
List<String> receivedHashValues = new ArrayList<>();
// used to keep track of the progress of the subsequent calls
CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size());
userIdStage.thenAcceptAsync(employeeIds -> {
logger.info("id-service result: {}", employeeIds);
employeeIds.forEach((Long id) -> {
CompletableFuture completable = nameService.resolveTemplate("userId", id).request()
.rx()
.get(String.class)
.toCompletableFuture();
completable.thenAccept((String userName) -> {
logger.info("name-service result: {}", userName);
hashService.resolveTemplate("rawValue", userName + id).request()
.rx()
.get(String.class)
.toCompletableFuture()
.thenAcceptAsync(hashValue -> {
logger.info("hash-service result: {}", hashValue);
receivedHashValues.add(hashValue);
completionTracker.countDown();
}).exceptionally((throwable) -> {
logger.warn("Hash computation failed for {}", id);
return null;
});
});
});
});
if (!completionTracker.await(10, TimeUnit.SECONDS)) {
logger.warn("Some requests didn't complete within the timeout");
}
assertThat(receivedHashValues).containsAll(expectedHashValues);
In the sample above, we compose our execution of the 3 services with fluent and readable code.
在上面的例子中,我们用流畅和可读的代码编写了我们对3个服务的执行。
The method thenAcceptAsync will execute the supplied function after the given CompletionStage has completed execution (or thrown an exception).
方法thenAcceptAsync将在给定的CompletionStage完成执行(或抛出异常)后执行所提供的函数。
Each successive call is non-blocking, making judicious use of system resources.
每个连续的调用都是无阻塞的,合理地使用系统资源。
The CompletionStage interface provides a wide variety of staging and orchestration methods that allow us to compose, order and asynchronously execute any number of steps in a multi-step orchestration (or a single service call).
CompletionStage接口提供了各种分期和协调方法,使我们能够在多步骤协调(或单个服务调用)中编排、排序和异步执行任意数量的步骤。
4.2. Observable in JAX-RS
4.2.Observable 在 JAX-RS 中
To use the Observable RxJava component, we must first register the RxObservableInvokerProvider provider (and not the “ObservableRxInvokerProvider” as is stated in the Jersey specification document) on the client:
要使用ObservableRxJava组件,我们必须首先在客户端注册RxObservableInvokerProviderprovider(而不是Jersey规范文档中所说的”ObservableRxInvokerProvider”)。
Client client = client.register(RxObservableInvokerProvider.class);
Then we override the default invoker:
然后我们覆盖默认的调用者。
Observable<List<Long>> userIdObservable = userIdService
.request()
.rx(RxObservableInvoker.class)
.get(new GenericType<List<Long>>(){});
From this point, we can use standard Observable semantics to orchestrate the processing flow:
从这一点出发,我们可以使用标准的Observable语义来协调处理流程。
userIdObservable.subscribe((List<Long> listOfIds)-> {
/** define processing flow for each ID */
});
4.3. Flowable in JAX-RS
4.3.Flowable 在 JAX-RS 中的应用
The semantics for using RxJava Flowable is similar to that of Observable. We register the appropriate provider:
使用RxJava的Flowable的语义与Observable相似。我们注册适当的提供者。
client.register(RxFlowableInvokerProvider.class);
Then we supply the RxFlowableInvoker:
然后我们提供RxFlowableInvoker。
Flowable<List<Long>> userIdFlowable = userIdService
.request()
.rx(RxFlowableInvoker.class)
.get(new GenericType<List<Long>>(){});
Following that, we can use the normal Flowable API.
在这之后,我们可以使用正常的Flowable API。
5. Conclusion
5.结论
The JAX-RS specification supplies a good number of options that yield clean, non-blocking execution of REST calls.
JAX-RS规范提供了大量的选项,以产生干净、非阻塞的REST调用执行。
The CompletionStage interface, in particular, provides a robust set of methods that cover a variety of service orchestration scenarios, as well as opportunities to supply custom Executors for more fine-grained control of the thread management.
特别是CompletionStage接口,它提供了一套强大的方法,涵盖了各种服务协调场景,以及提供自定义Executors的机会,以便更精细地控制线程管理。
You can check out the code for this article over on Github.
你可以在Github上查看这篇文章的代码over。