Asynchronous HTTP Programming with Play Framework – 使用Play框架的异步HTTP编程

最后修改: 2020年 2月 24日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Often our web services need to use other web services in order to do their job. It can be difficult to serve user requests while keeping a low response time. A slow external service can increase our response time and cause our system to pile up requests, using more resources. This is where a non-blocking approach can be very helpful

通常我们的网络服务需要使用其他网络服务来完成它们的工作。在保持较低的响应时间的同时为用户请求提供服务是很困难的。一个缓慢的外部服务会增加我们的响应时间,导致我们的系统堆积请求,使用更多资源。这时,非阻塞式的方法会非常有帮助

In this tutorial, we’ll fire multiple asynchronous requests to a service from a Play Framework application. By leveraging Java’s non-blocking HTTP capability, we’ll be able to smoothly query external resources without affecting our own main logic.

在本教程中,我们将从Play Framework应用程序中向一个服务发出多个异步请求。通过利用 Java 的非阻塞 HTTP 功能,我们将能够顺利地查询外部资源而不影响我们自己的主逻辑。

In our example we’ll explore the Play WebService Library.

在我们的例子中,我们将探索Play WebService库

2. The Play WebService (WS) Library

2.播放网络服务(WS)库

WS is a powerful library providing asynchronous HTTP calls using Java Action.

WS是一个强大的库,使用JavaAction提供异步的HTTP调用。

Using this library, our code sends these requests and carries on without blocking. To process the result of the request, we provide a consuming function, that is, an implementation of the Consumer interface.

使用这个库,我们的代码发送这些请求,并在没有阻塞的情况下进行。为了处理请求的结果,我们提供一个消费函数,也就是Consumer接口的实现。

This pattern shares some similarities with JavaScript’s implementation of callbacks, Promises, and the async/await pattern.

这种模式与JavaScript的回调实现、Promisesasync/await模式有一些相似之处。

Let’s build a simple Consumer that logs some of the response data:

让我们建立一个简单的Consumer,记录一些响应数据。

ws.url(url)
  .thenAccept(r -> 
    log.debug("Thread#" + Thread.currentThread().getId() 
      + " Request complete: Response code = " + r.getStatus() 
      + " | Response: " + r.getBody() 
      + " | Current Time:" + System.currentTimeMillis()))

Our Consumer is merely logging in this example. The consumer could do anything that we need to do with the result, though, like store the result in a database.

在这个例子中,我们的Consumer只是记录。不过,消费者可以做任何我们需要对结果做的事情,比如将结果存储在数据库中。

If we look deeper into the library’s implementation, we can observe that WS wraps and configures Java’s AsyncHttpClient, which is part of the standard JDK and does not depend on Play.

如果我们深入研究该库的实现,我们可以观察到WS包装和配置了Java的AsyncHttpClient,它是标准JDK的一部分,不依赖Play。

3. Prepare an Example Project

3.准备一个项目实例

To experiment with the framework, let’s create some unit tests to launch requests. We’ll create a skeleton web application to answer them and use the WS framework to make HTTP requests.

为了实验该框架,让我们创建一些单元测试来启动请求。我们将创建一个骨架网络应用来回答它们,并使用WS框架来发出HTTP请求。

3.1. The Skeleton Web Application

3.1.骨架式网络应用程序

First of all, we create the initial project by using the sbt new command:

首先,我们通过使用sbt new命令创建初始项目。

sbt new playframework/play-java-seed.g8

In the new folder, we then edit the build.sbt file and add the WS library dependency:

在新的文件夹中,我们然后编辑build.sbt文件并添加WS库的依赖性:

libraryDependencies += javaWs

Now we can start the server with the sbt run command:

现在我们可以用sbt run命令启动服务器。

$ sbt run
...
--- (Running the application, auto-reloading is enabled) ---

[info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000

Once the application has started, we can check everything is ok by browsing http://localhost:9000, which will open Play’s welcome page.

一旦应用程序启动,我们可以通过浏览http://localhost:9000来检查一切正常,这将打开Play的欢迎页面。

3.2. The Testing Environment

3.2.测试环境

To test our application, we’ll use the unit test class HomeControllerTest.

为了测试我们的应用程序,我们将使用单元测试类HomeControllerTest

First, we need to extend WithServer which will provide the server life cycle:

首先,我们需要扩展WithServer,它将提供服务器的生命周期。

public class HomeControllerTest extends WithServer {

Thanks to its parent, this class now starts our skeleton webserver in test mode and on a random port, before running the tests. The WithServer class also stops the application when the test is finished.

多亏了它的父类,这个类现在以测试模式和随机端口启动我们的骨架网络服务器,然后运行测试。测试结束后,WithServer类也会停止应用程序。

Next, we need to provide an application to run.

接下来,我们需要提供一个要运行的应用程序。

We can create it with Guice‘s GuiceApplicationBuilder:

我们可以用GuiceGuiceApplicationBuilder创建它。

@Override
protected Application provideApplication() {
    return new GuiceApplicationBuilder().build();
}

And finally, we set up the server URL to use in our tests, using the port number provided by the test server:

最后,我们使用测试服务器提供的端口号,设置了测试中使用的服务器URL。

@Override
@Before
public void setup() {
    OptionalInt optHttpsPort = testServer.getRunningHttpsPort();
    if (optHttpsPort.isPresent()) {
        port = optHttpsPort.getAsInt();
        url = "https://localhost:" + port;
    } else {
        port = testServer.getRunningHttpPort()
          .getAsInt();
        url = "http://localhost:" + port;
    }
}

Now we’re ready to write tests. The comprehensive test framework lets us concentrate on coding our test requests.

现在我们已经准备好写测试了。全面的测试框架让我们专注于测试请求的编码。

4. Prepare a WSRequest

4.准备一个WSRequest

Let’s see how we can fire basic types of requests, such as GET or POST, and multipart requests for file upload.

让我们看看我们如何发射基本类型的请求,如GET或POST,以及文件上传的多部分请求。

4.1. Initialize the WSRequest Object

4.1.初始化WSRequest对象

First of all, we need to obtain a WSClient instance to configure and initialize our requests.

首先,我们需要获得一个WSClient实例来配置和初始化我们的请求。

In a real-life application, we can get a client, auto-configured with default settings, via dependency injection:

在现实生活中,我们可以通过依赖性注入得到一个客户端,自动配置的默认设置。

@Autowired
WSClient ws;

In our test class, though, we use WSTestClient, available from Play Test framework:

不过在我们的测试类中,我们使用了WSTestClient,可从Play Test框架获取。

WSClient ws = play.test.WSTestClient.newClient(port);

Once we have our client, we can initialize a WSRequest object by calling the url method:

一旦我们有了客户端,我们就可以通过调用url方法来初始化一个WSRequest对象。

ws.url(url)

The url method does enough to allow us to fire a request. However, we can customize it further by adding some custom settings:

url方法的作用足以让我们发射一个请求。然而,我们可以通过添加一些自定义设置来进一步定制它。

ws.url(url)
  .addHeader("key", "value")
  .addQueryParameter("num", "" + num);

As we can see, it’s pretty easy to add headers and query parameters.

正如我们所看到的,添加头信息和查询参数是很容易的。

After we’ve fully configured our request, we can call the method to initiate it.

在我们完全配置好我们的请求后,我们可以调用方法来启动它。

4.2. Generic GET Request

4.2.通用的GET请求

To trigger a GET request we just have to call the get method on our WSRequest object:

要触发一个GET请求,我们只需在我们的WSRequest对象上调用get方法。

ws.url(url)
  ...
  .get();

As this is a non-blocking code, it starts the request and then continues execution at the next line of our function.

由于这是一个非阻塞性的代码,它开始请求,然后在我们函数的下一行继续执行。

The object returned by get is a CompletionStage instance, which is part of the CompletableFuture API.

get返回的对象是一个CompletionStage实例,它是CompletableFuture API的一部分。

Once the HTTP call has completed, this stage executes just a few instructions. It wraps the response in a WSResponse object.

一旦HTTP调用完成,这个阶段只执行一些指令。它将响应包装在一个WSResponse对象中。

Normally, this result would be passed on to the next stage of the execution chain. In this example, we have not provided any consuming function, so the result is lost.

通常情况下,这个结果会被传递到执行链的下一个阶段。在这个例子中,我们没有提供任何消耗函数,所以结果会丢失。

For this reason, this request is of type “fire-and-forget”.

由于这个原因,这个请求属于 “火烧连营 “类型。

4.3. Submit a Form

4.3.提交一份表格

Submitting a form is not very different from the get example.

提交表单与get的例子没有很大区别。

To trigger the request we just call the post method:

为了触发请求,我们只需调用post方法。

ws.url(url)
  ...
  .setContentType("application/x-www-form-urlencoded")
  .post("key1=value1&key2=value2");

In this scenario, we need to pass a body as a parameter. This can be a simple string like a file, a json or xml document, a BodyWritable or a Source.

在这种情况下,我们需要传递一个主体作为参数。这可以是一个简单的字符串,如一个文件、一个json或xml文档、一个BodyWritable或一个Source

4.4. Submit a Multipart/Form Data

4.4.提交一个Multipart/Form数据

A multipart form requires us to send both input fields and data from an attached file or stream.

多部分表单要求我们同时发送输入字段和来自附件文件或流的数据。

To implement this in the framework, we use the post method with a Source.

为了在框架中实现这一点,我们使用带有Sourcepost方法。

Inside the source, we can wrap all the different data types needed by our form:

在源文件中,我们可以包装我们的表单所需的所有不同数据类型。

Source<ByteString, ?> file = FileIO.fromPath(Paths.get("hello.txt"));
FilePart<Source<ByteString, ?>> file = 
  new FilePart<>("fileParam", "myfile.txt", "text/plain", file);
DataPart data = new DataPart("key", "value");

ws.url(url)
...
  .post(Source.from(Arrays.asList(file, data)));

Though this approach adds some more configuration, it is still very similar to the other types of requests.

虽然这种方法增加了一些配置,但它仍然与其他类型的请求非常相似。

5. Process the Async Response

5.处理异步响应

Up to this point, we have only triggered fire-and-forget requests, where our code doesn’t do anything with the response data.

到目前为止,我们只触发了fire-and-forget请求,我们的代码不对响应数据做任何处理。

Let’s now explore two techniques for processing an asynchronous response.

现在让我们来探讨处理异步响应的两种技术。

We can either block the main thread, waiting for a CompletableFuture, or consume asynchronously with a Consumer.

我们可以阻塞主线程,等待CompletableFuture,或者用Consumer异步消费。

5.1. Process Response by Blocking With CompletableFuture

5.1.用CompletableFuture进行阻塞处理响应

Even when using an asynchronous framework, we may choose to block our code’s execution and wait for the response.

即使在使用异步框架时,我们也可以选择阻止代码的执行,并等待响应。

Using the CompletableFuture API, we just need a few changes in our code to implement this scenario:

使用CompletableFuture API,我们只需要在代码中做一些修改就可以实现这种情况。

WSResponse response = ws.url(url)
  .get()
  .toCompletableFuture()
  .get();

This could be useful, for example, to provide a strong data consistency that we cannot achieve in other ways.

这可能是有用的,例如,提供一个强大的数据一致性,而我们无法通过其他方式实现。

5.2. Process Response Asynchronously

5.2.异步处理响应

To process an asynchronous response without blocking, we provide a Consumer or Function that is run by the asynchronous framework when the response is available.

为了在不阻塞的情况下处理异步响应,我们提供了一个ConsumerFunction ,当响应可用时,异步框架将运行该响应。

For example, let’s add a Consumer to our previous example to log the response:

例如,让我们在前面的例子中添加一个Consumer来记录响应。

ws.url(url)
  .addHeader("key", "value")
  .addQueryParameter("num", "" + 1)
  .get()
  .thenAccept(r -> 
    log.debug("Thread#" + Thread.currentThread().getId() 
      + " Request complete: Response code = " + r.getStatus() 
      + " | Response: " + r.getBody() 
      + " | Current Time:" + System.currentTimeMillis()));

We then see the response in the logs:

然后我们在日志中看到响应。

[debug] c.HomeControllerTest - Thread#30 Request complete: Response code = 200 | Response: {
  "Result" : "ok",
  "Params" : {
    "num" : [ "1" ]
  },
  "Headers" : {
    "accept" : [ "*/*" ],
    "host" : [ "localhost:19001" ],
    "key" : [ "value" ],
    "user-agent" : [ "AHC/2.1" ]
  }
} | Current Time:1579303109613

It’s worth noting that we used thenAccept, which requires a Consumer function since we don’t need to return anything after logging.

值得注意的是,我们使用了thenAccept,这需要一个Consumer函数,因为我们不需要在记录后返回任何东西。

When we want the current stage to return something, so that we can use it in the next stage, we need thenApply instead, which takes a Function.

当我们想让当前阶段返回一些东西,以便我们可以在下一阶段使用它时,我们需要thenApply来代替,它需要一个Function

These use the conventions of the standard Java Functional Interfaces.

这些使用标准Java功能接口的惯例。

5.3. Large Response Body

5.3.大型反应体

The code we’ve implemented so far is a good solution for small responses and most use cases. However, if we need to process a few hundreds of megabytes of data, we’ll need a better strategy.

到目前为止,我们实现的代码对于小型响应和大多数用例来说是一个很好的解决方案。但是,如果我们需要处理几百兆字节的数据,我们就需要一个更好的策略。

We should note: Request methods like get and post load the entire response in memory.

我们应该注意:getpost这样的请求方法在内存中加载整个响应。

To avoid a possible OutOfMemoryError, we can use Akka Streams to process the response without letting it fill our memory.

为了避免可能出现的OutOfMemoryError,我们可以使用Akka Streams来处理响应,而不会让它填满我们的内存。

For example, we can write its body in a file:

例如,我们可以将其主体写在一个文件中。

ws.url(url)
  .stream()
  .thenAccept(
    response -> {
        try {
            OutputStream outputStream = Files.newOutputStream(path);
            Sink<ByteString, CompletionStage<Done>> outputWriter =
              Sink.foreach(bytes -> outputStream.write(bytes.toArray()));
            response.getBodyAsSource().runWith(outputWriter, materializer);
        } catch (IOException e) {
            log.error("An error happened while opening the output stream", e);
        }
    });

The stream method returns a CompletionStage where the WSResponse has a getBodyAsStream method that provides a Source<ByteString, ?>.

stream方法返回一个CompletionStage,其中WSResponse有一个getBodyAsStream方法,提供一个Source<ByteString, ?>

We can tell the code how to process this type of body by using Akka’s Sink, which in our example will simply write any data passing through in the OutputStream.

我们可以通过使用Akka的Sink来告诉代码如何处理这种类型的主体,在我们的例子中,它将简单地写入任何在OutputStream中传递的数据。

5.4. Timeouts

5.4.超时

When building a request, we can also set a specific timeout, so the request is interrupted if we don’t receive the complete response in time.

在建立一个请求时,我们也可以设置一个特定的超时,所以如果我们没有及时收到完整的响应,请求就会中断。

This is a particularly useful feature when we see that a service we’re querying is particularly slow and could cause a pile-up of open connections stuck waiting for the response.

当我们看到我们正在查询的服务特别慢时,这是一个特别有用的功能,可能会导致开放的连接堆积起来等待响应。

We can set a global timeout for all our requests using tuning parameters. For a request-specific timeout, we can add to a request using setRequestTimeout:

我们可以使用调整参数为我们所有的请求设置一个全局超时。对于特定请求的超时,我们可以使用setRequestTimeout添加到一个请求。

ws.url(url)
  .setRequestTimeout(Duration.of(1, SECONDS));

There’s still one case to handle, though: We may have received all the data, but our Consumer may be very slow processing it. This might happen if there is lots of data crunching, database calls, etc.

但仍有一种情况需要处理。我们可能已经收到了所有的数据,但是我们的Consumer可能处理得很慢。如果有大量的数据计算、数据库调用等,就可能发生这种情况。

In low throughput systems, we can simply let the code run until it completes. However, we may wish to abort long-running activities.

在低吞吐量系统中,我们可以简单地让代码运行,直到它完成。然而,我们可能希望中止长期运行的活动。

To achieve that, we have to wrap our code with some futures handling.

为了实现这一目标,我们必须用一些期货处理来包装我们的代码。

Let’s simulate a very long process in our code:

让我们在代码中模拟一个很长的过程。

ws.url(url)
  .get()
  .thenApply(
    result -> { 
        try { 
            Thread.sleep(10000L); 
            return Results.ok(); 
        } catch (InterruptedException e) { 
            return Results.status(SERVICE_UNAVAILABLE); 
        } 
    });

This will return an OK response after 10 seconds, but we don’t want to wait that long.

这将在10秒后返回一个OK响应,但我们不想等那么久。

Instead, with the timeout wrapper, we instruct our code to wait for no more than 1 second:

相反,通过timeout包装器,我们指示我们的代码等待时间不超过1秒。

CompletionStage<Result> f = futures.timeout(
  ws.url(url)
    .get()
    .thenApply(result -> {
        try {
            Thread.sleep(10000L);
            return Results.ok();
        } catch (InterruptedException e) {
            return Results.status(SERVICE_UNAVAILABLE);
        }
    }), 1L, TimeUnit.SECONDS);

Now our future will return a result either way: the computation result if the Consumer finished in time, or the exception due to the futures timeout.

现在我们的future将返回一个结果,如果Consumer及时完成,则返回计算结果,或者由于futures超时而产生的异常。

5.5. Handling Exceptions

5.5.处理异常情况

In the previous example, we created a function that either returns a result or fails with an exception. So, now we need to handle both scenarios.

在前面的例子中,我们创建了一个函数,它要么返回一个结果,要么以一个异常失败。所以,现在我们需要处理这两种情况。

We can handle both success and failure scenarios with the handleAsync method.

我们可以通过handleAsync方法处理成功和失败的情况。

Let’s say that we want to return the result, if we’ve got it, or log the error and return the exception for further handling:

比方说,我们想返回结果,如果我们已经得到了结果,或者记录错误并返回异常以便进一步处理。

CompletionStage<Object> res = f.handleAsync((result, e) -> {
    if (e != null) {
        log.error("Exception thrown", e);
        return e.getCause();
    } else {
        return result;
    }
});

The code should now return a CompletionStage containing the TimeoutException thrown.

现在代码应该返回一个CompletionStage,包含抛出的TimeoutException

We can verify it by simply calling an assertEquals on the class of the exception object returned:

我们可以通过简单地在返回的异常对象的类上调用assertEquals来验证它。

Class<?> clazz = res.toCompletableFuture().get().getClass();
assertEquals(TimeoutException.class, clazz);

When running the test, it will also log the exception we received:

当运行测试时,它也会记录我们收到的异常。

[error] c.HomeControllerTest - Exception thrown
java.util.concurrent.TimeoutException: Timeout after 1 second
...

6. Request Filters

6.要求过滤器

Sometimes, we need to run some logic before a request is triggered.

有时,我们需要在一个请求被触发之前运行一些逻辑。

We could manipulate the WSRequest object once initialized, but a more elegant technique is to set a WSRequestFilter.

我们可以在初始化后操纵WSRequest对象,但更优雅的技术是设置一个WSRequestFilter

A filter can be set during initialization, before calling the triggering method, and is attached to the request logic.

过滤器可以在初始化过程中,在调用触发方法之前设置,并附加到请求逻辑中。

We can define our own filter by implementing the WSRequestFilter interface, or we can add a ready-made one.

我们可以通过实现WSRequestFilter接口来定义我们自己的过滤器,或者我们可以添加一个现成的。

A common scenario is logging what the request looks like before executing it.

一个常见的情况是在执行请求之前记录请求的样子。

In this case, we just need to set the AhcCurlRequestLogger:

在这种情况下,我们只需要设置AhcCurlRequestLogger

ws.url(url)
  ...
  .setRequestFilter(new AhcCurlRequestLogger())
  ...
  .get();

The resulting log has a curl-like format:

产生的日志有一个curl类似的格式。

[info] p.l.w.a.AhcCurlRequestLogger - curl \
  --verbose \
  --request GET \
  --header 'key: value' \
  'http://localhost:19001'

We can set the desired log level, by changing our logback.xml configuration.

我们可以通过改变我们的logback.xml配置,设置所需的日志级别。

7. Caching Responses

7.缓存响应

WSClient also supports the caching of responses.

WSClient也支持响应的缓存。

This feature is particularly useful when the same request is triggered multiple times and we don’t need the freshest data every time.

当同一个请求被多次触发,而我们并不是每次都需要最新鲜的数据时,这个功能就特别有用。

It also helps when the service we’re calling is temporarily down.

当我们所呼叫的服务暂时中断时,它也有帮助。

7.1. Add Caching Dependencies

7.1.添加缓存的依赖性

To configure caching we need first to add the dependency in our build.sbt:

为了配置缓存,我们首先需要在我们的build.sbt中添加依赖性。

libraryDependencies += ehcache

This configures Ehcache as our caching layer.

这将配置Ehcache作为我们的缓存层。

If we don’t want Ehcache specifically, we can use any other JSR-107 cache implementation.

如果我们不特别想要Ehcache,我们可以使用任何其他JSR-107缓存实现。

7.2. Force Caching Heuristic

7.2.强制缓存启发式

By default, Play WS won’t cache HTTP responses if the server doesn’t return any caching configuration.

默认情况下,如果服务器没有返回任何缓存配置,Play WS就不会缓存HTTP响应。

To circumvent this, we can force the heuristic caching by adding a setting to our application.conf:

为了规避这个问题,我们可以通过在application.conf中添加一个设置来强制进行启发式缓存。

play.ws.cache.heuristics.enabled=true

This will configure the system to decide when it’s useful to cache an HTTP response, regardless of the remote service’s advertised caching.

这将配置系统来决定什么时候缓存HTTP响应是有用的,而不考虑远程服务所宣传的缓存。

8. Additional Tuning

8.额外调校

Making requests to an external service may require some client configuration. We may need to handle redirects, a slow server, or some filtering depending on the user-agent header.

向外部服务发出请求可能需要一些客户端配置。我们可能需要处理重定向、缓慢的服务器,或根据用户-代理头进行一些过滤。

To address that, we can tune our WS client, using properties in our application.conf:

为了解决这个问题,我们可以使用application.conf中的属性调整我们的WS客户端:

play.ws.followRedirects=false
play.ws.useragent=MyPlayApplication
play.ws.compressionEnabled=true
# time to wait for the connection to be established
play.ws.timeout.connection=30
# time to wait for data after the connection is open
play.ws.timeout.idle=30
# max time available to complete the request
play.ws.timeout.request=300

It’s also possible to configure the underlying AsyncHttpClient directly.

也可以直接配置底层的AsyncHttpClient

The full list of available properties can be checked in the source code of AhcConfig.

可用属性的完整列表可以在AhcConfig的源代码中查看。

9. Conclusion

9.结语

In this article, we explored the Play WS library and its main features. We configured our project, learned how to fire common requests and to process their response, both synchronously and asynchronously.

在这篇文章中,我们探索了Play WS库和它的主要功能。我们配置了我们的项目,学习了如何发射常见的请求,以及如何同步和异步地处理它们的响应。

We worked with large data downloads and saw how to cut short long-running activities.

我们处理了大量的数据下载,看到了如何缩短长期运行的活动。

Finally, we looked at caching to improve performance, and how to tune the client.

最后,我们研究了缓存以提高性能,以及如何调整客户端。

As always, the source code for this tutorial is available over on GitHub.

一如既往,本教程的源代码可在GitHub上获得