WebSockets with the Play Framework and Akka – 使用Play框架和Akka的WebSockets

最后修改: 2019年 11月 23日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

When we wish our web clients to maintain a dialogue with our server, then WebSockets can be a useful solution. WebSockets keep a persistent full-duplex connection. This gives us the capability to send bi-directional messages between our server and client. 

当我们希望我们的网络客户端与我们的服务器保持对话时,那么WebSockets可以是一个有用的解决方案。WebSockets保持一个持久的全双工连接。这使我们有能力在服务器和客户端之间发送双向消息。

In this tutorial, we’re going to learn how to use WebSockets with Akka in the Play Framework.

在本教程中,我们将学习如何在Akka中使用WebSockets与Play框架

2. Setup

2.设置

Let’s set up a simple chat application. The user will send messages to the server, and the server will respond with a message from JSONPlaceholder.

让我们来设置一个简单的聊天应用程序。用户将向服务器发送消息,而服务器将以JSONPlaceholder的消息作为回应。

2.1. Setting up the Play Framework Application

2.1.设置游戏框架应用程序

We’ll build this application using the Play Framework.

我们将使用Play框架构建这个应用程序。

Let’s follow the instructions from Introduction to Play in Java to set up and run a simple Play Framework application.

让我们按照Introduction to Play in Java的说明来设置和运行一个简单的 Play 框架应用程序。

2.2. Adding the Necessary JavaScript Files

2.2.添加必要的JavaScript文件

Also, we’ll need to work with JavaScript for client-side scripting. This will enable us to receive new messages pushed from the server. We’ll use the jQuery library for this.

此外,我们还需要用JavaScript来进行客户端脚本工作。这将使我们能够接收从服务器上推送的新消息。我们将为此使用jQuery库。

Let’s add jQuery to the bottom of the app/views/index.scala.html file:

让我们在app/views/index.scala.html文件的底部添加jQuery。

<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>

2.3. Setting up Akka

2.3.设置Akka

Finally, we’ll use Akka to handle the WebSocket connections on the server-side.

最后,我们将使用Akka来处理服务器端的WebSocket连接。

Let’s navigate to the build.sbt file and add the dependencies.

让我们导航到build.sbt文件并添加依赖关系。

We need to add the akka-actor and akka-testkit dependencies:

我们需要添加akka-actorakka-testkit依赖项。

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion

We need these to be able to use and test the Akka Framework code.

我们需要这些来使用和测试Akka框架的代码。

Next, we’re going to be using Akka streams. So let’s add the akka-stream dependency:

接下来,我们将使用Akka流。所以让我们添加akka-stream依赖性。

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion

Lastly, we need to call a rest endpoint from an Akka actor. For this, we’ll need the akka-http dependency. When we do so, the endpoint will return JSON data which we’ll have to deserialize, so we need to add the akka-http-jackson dependency as well:

最后,我们需要从Akka actor中调用一个rest端点。为此,我们需要akka-http依赖。当我们这样做时,端点将返回JSON数据,我们必须对其进行反序列化,所以我们也需要添加akka-http-jackson依赖。

libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion

And now we’re all set. Let’s see how to get WebSockets working!

现在我们都准备好了。让我们来看看如何让WebSockets工作吧

3. Handling WebSockets With Akka Actors

3.用Akka行为体处理WebSockets

Play’s WebSocket handling mechanism is built around Akka streams. A WebSocket is modeled as a Flow. So, incoming WebSocket messages are fed into the flow, and messages produced by the flow are sent out to the client.

Play的WebSocket处理机制是围绕Akka流构建的。一个WebSocket被建模为一个流。因此,传入的WebSocket消息被送入流中,而由流产生的消息则被发送至客户端。

To handle a WebSocket using an Actor, we’re going to need the Play utility ActorFlow which converts an ActorRef to a flow. This mainly requires some Java code, with a little configuration.

要使用Actor处理WebSocket,我们需要Play工具ActorFlow,它可以将ActorRef转换为flow。这主要需要一些Java代码,以及一些配置。

3.1. The WebSocket Controller Method

3.1.WebSocket控制器方法

First, we need a Materializer instance. The Materializer is a factory for stream execution engines.

首先,我们需要一个Materializer实例。Materializer是一个流执行引擎的工厂。

We need to inject the ActorSystem and the Materializer into the controller app/controllers/HomeController.java:

我们需要将ActorSystemMaterializer注入到控制器app/controllers/HomeController.java

private ActorSystem actorSystem;
private Materializer materializer;

@Inject
public HomeController(
  ActorSystem actorSystem, Materializer materializer) {
    this.actorSystem = actorSystem;
    this.materializer = materializer;
}

Let’s now add a socket controller method:

现在让我们添加一个套接字控制器方法。

public WebSocket socket() {
    return WebSocket.Json
      .acceptOrResult(this::createActorFlow);
}

Here we’re calling the function acceptOrResult that takes the request header and returns a future. The returned future is a flow to handle the WebSocket messages.

在这里,我们正在调用函数acceptOrResult,该函数接收请求头并返回一个未来。返回的未来是一个处理WebSocket消息的流程。

We can, instead, reject the request and return a rejection result.

相反,我们可以拒绝该请求并返回一个拒绝结果。

Now, let’s create the flow:

现在,让我们来创建流程。

private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> 
  createActorFlow(Http.RequestHeader request) {
    return CompletableFuture.completedFuture(
      F.Either.Right(createFlowForActor()));
}

The class in Play Framework defines a set of functional programming style helpers. In this case, we are using F.Either.Right to accept the connection and return the flow.

Play Framework中的F类定义了一组函数式编程风格的帮助器。在本例中,我们使用F.Either.Right来接受连接并返回流量。

Let’s say we wanted to reject the connection when the client is not authenticated.

假设我们想在客户端没有经过认证时拒绝连接。

For this, we could check if a username is set in the session. And if it’s not, we decline the connection with HTTP 403 Forbidden:

为此,我们可以检查会话中是否设置了一个用户名。如果没有,我们就用HTTP 403 Forbidden拒绝该连接。

private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> 
  createActorFlow2(Http.RequestHeader request) {
    return CompletableFuture.completedFuture(
      request.session()
      .getOptional("username")
      .map(username -> 
        F.Either.<Result, Flow<JsonNode, JsonNode, ?>>Right(
          createFlowForActor()))
      .orElseGet(() -> F.Either.Left(forbidden())));
}

We use F.Either.Left to reject the connection in the same way as we provide a flow with F.Either.Right.

我们使用F.Either.Left来拒绝连接,就像我们用F.Either.Right提供一个流一样。

Finally, we link the flow to the actor that will handle the messages:

最后,我们将流程与处理信息的行为者联系起来。

private Flow<JsonNode, JsonNode, ?> createFlowForActor() {
    return ActorFlow.actorRef(out -> Messenger.props(out), 
      actorSystem, materializer);
}

The ActorFlow.actorRef creates a flow that is handled by the Messenger actor.

ActorFlow.actorRef创建一个流程,由Messenger角色处理

3.2. The routes File

3.2.routes文件

Now, let’s add the routes definitions for the controller methods in conf/routes:

现在,让我们在conf/routes中为控制器方法添加routes定义。

GET  /                    controllers.HomeController.index(request: Request)
GET  /chat                controllers.HomeController.socket
GET  /chat/with/streams   controllers.HomeController.akkaStreamsSocket
GET  /assets/*file        controllers.Assets.versioned(path="/public", file: Asset)

These route definitions map incoming HTTP requests to controller action methods as explained in Routing in Play Applications in Java.

这些路由定义将传入的 HTTP 请求映射到控制器的操作方法,正如在Java 中 Play 应用程序的路由中所解释的那样。

3.3. The Actor Implementation

3.3.行为者的实施

The most important part of the actor class is the createReceive method which determines which messages the actor can handle:

actor类中最重要的部分是createReceivemethod,它决定了actor可以处理哪些消息。

@Override
public Receive createReceive() {
    return receiveBuilder()
      .match(JsonNode.class, this::onSendMessage)
      .matchAny(o -> log.error("Received unknown message: {}", o.getClass()))
      .build();
}

The actor will forward all messages matching the JsonNode class to the onSendMessage handler method:

行为体将把所有与JsonNode类相匹配的消息转发给onSendMessagehandler方法。

private void onSendMessage(JsonNode jsonNode) {
    RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
    String message = requestDTO.getMessage().toLowerCase();
    //..
    processMessage(requestDTO);
}

Then the handler will respond to every message using the processMessage method:

然后处理程序将使用processMessage方法对每个消息作出回应。

private void processMessage(RequestDTO requestDTO) {
    CompletionStage<HttpResponse> responseFuture = getRandomMessage();
    responseFuture.thenCompose(this::consumeHttpResponse)
      .thenAccept(messageDTO ->
        out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()));
}

3.4. Consuming Rest API with Akka HTTP

3.4.用Akka HTTP消耗Rest API

We’ll send HTTP requests to the dummy message generator at JSONPlaceholder Posts. When the response arrives, we send the response to the client by writing it out.

我们将向位于JSONPlaceholder Posts的假消息生成器发送HTTP请求。当响应到达时,我们通过写出out来将响应发送给客户端。

Let’s have a method that calls the endpoint with a random post id:

让我们有一个方法,用一个随机的帖子ID调用端点。

private CompletionStage<HttpResponse> getRandomMessage() {
    int postId = ThreadLocalRandom.current().nextInt(0, 100);
    return Http.get(getContext().getSystem())
      .singleRequest(HttpRequest.create(
        "https://jsonplaceholder.typicode.com/posts/" + postId));
}

We’re also processing the HttpResponse we get from calling the service in order to get the JSON response:

我们也在处理我们从调用服务得到的HttpResponse,以便得到JSON响应。

private CompletionStage<MessageDTO> consumeHttpResponse(
  HttpResponse httpResponse) {
    Materializer materializer = 
      Materializer.matFromSystem(getContext().getSystem());
    return Jackson.unmarshaller(MessageDTO.class)
      .unmarshal(httpResponse.entity(), materializer)
      .thenApply(messageDTO -> {
          log.info("Received message: {}", messageDTO);
          discardEntity(httpResponse, materializer);
          return messageDTO;
      });
}

The MessageConverter class is a utility for converting between JsonNode and the DTOs:

MessageConverter类是一个用于在JsonNode和DTO之间转换的工具。

public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) {
    ObjectMapper mapper = new ObjectMapper();
    return mapper.convertValue(jsonNode, MessageDTO.class);
}

Next, we need to discard the entity.  The discardEntityBytes convenience method serves the purpose of easily discarding the entity if it has no purpose for us.

接下来,我们需要丢弃该实体discardEntityBytes便利方法的作用是,如果实体对我们没有用处,我们可以轻松地抛弃它。

Let’s see how to discard the bytes:

让我们看看如何丢弃这些字节。

private void discardEntity(
  HttpResponse httpResponse, Materializer materializer) {
    HttpMessage.DiscardedEntity discarded = 
      httpResponse.discardEntityBytes(materializer);
    discarded.completionStage()
      .whenComplete((done, ex) -> 
        log.info("Entity discarded completely!"));
}

Now having done the handling of the WebSocket, let’s see how we can set up a client for this using HTML5 WebSockets.

现在已经完成了对WebSocket的处理,让我们看看如何使用HTML5 WebSockets为其设置一个客户端。

4. Setting up the WebSocket Client

4.设置WebSocket客户端

For our client, let’s build a simple web-based chat application.

对于我们的客户,让我们建立一个简单的基于网络的聊天应用程序。

4.1. The Controller Action

4.1.控制器的行动

We need to define a controller action that renders the index page. We’ll put this in the controller class app.controllers.HomeController:

我们需要定义一个控制器动作来渲染索引页面。我们将把它放在控制器类app.controllers.HomeController中。

public Result index(Http.Request request) {
    String url = routes.HomeController.socket()
      .webSocketURL(request);
    return ok(views.html.index.render(url));
}

4.2. The Template Page

4.2.模板页面

Now, let’s head over to the app/views/ndex.scala.html page and add a container for the received messages and a form to capture a new message:

现在,让我们前往app/views/ndex.scala.html页,为收到的消息添加一个容器,并添加一个表单来捕获新的消息。

<div id="messageContent"></div>F
<form>
    <textarea id="messageInput"></textarea>
    <button id="sendButton">Send</button>
</form>

We’ll also need to pass in the URL for the WebSocket controller action by declaring this parameter at the top of the app/views/index.scala.html page:

我们还需要在app/views/index.scala.html页面的顶部声明此参数,以传递WebSocket控制器操作的URL。

@(url: String)

4.3. WebSocket Event Handlers in JavaScript

4.3.JavaScript中的WebSocket事件处理程序

And now, we can add the JavaScript to handle the WebSocket events. For simplicity, we’ll add the JavaScript functions at the bottom of the app/views/index.scala.html page.

现在,我们可以添加JavaScript来处理WebSocket事件。为简单起见,我们将在app/views/index.scala.html页面的底部添加JavaScript函数。

Let’s declare the event handlers:

我们来声明事件处理程序。

var webSocket;
var messageInput;

function init() {
    initWebSocket();
}

function initWebSocket() {
    webSocket = new WebSocket("@url");
    webSocket.onopen = onOpen;
    webSocket.onclose = onClose;
    webSocket.onmessage = onMessage;
    webSocket.onerror = onError;
}

Let’s add the handlers themselves:

让我们来添加处理程序本身。

function onOpen(evt) {
    writeToScreen("CONNECTED");
}

function onClose(evt) {
    writeToScreen("DISCONNECTED");
}

function onError(evt) {
    writeToScreen("ERROR: " + JSON.stringify(evt));
}

function onMessage(evt) {
    var receivedData = JSON.parse(evt.data);
    appendMessageToView("Server", receivedData.body);
}

Then, to present the output, we’ll use the functions appendMessageToView and writeToScreen:

然后,为了呈现输出,我们将使用函数appendMessageToViewwriteToScreen

function appendMessageToView(title, message) {
    $("#messageContent").append("<p>" + title + ": " + message + "</p>");
}

function writeToScreen(message) {
    console.log("New message: ", message);
}

4.4. Running and Testing the Application

4.4.运行和测试应用程序

We’re ready to test the application, so let’s run it:

我们已经准备好测试这个应用程序,所以让我们运行它。

cd websockets
sbt run

With the application running, we can chat with the server by visiting http://localhost:9000:

随着应用程序的运行,我们可以通过访问http://localhost:9000与服务器聊天。

Every time we type a message and hit Send the server will immediately respond with some lorem ipsum from the JSON Placeholder service.

每次我们输入信息并点击Send,服务器将立即从JSON Placeholder服务中回复一些lorem ipsum

5. Handling WebSockets Directly with Akka Streams

5.用Akka流直接处理WebSockets

If we are processing a stream of events from a source and sending these to the client, then we can model this around Akka streams.

如果我们要处理来自源头的事件流,并将其发送到客户端,那么我们可以围绕Akka流来建模。

Let’s see how we can use Akka streams in an example where the server sends messages every two seconds.

让我们看看我们如何在服务器每两秒发送一次消息的例子中使用Akka流。

We’ll start with the WebSocket action in the HomeController:

我们将从HomeController中的WebSocket动作开始。

public WebSocket akkaStreamsSocket() {
    return WebSocket.Json.accept(request -> {
        Sink<JsonNode, ?> in = Sink.foreach(System.out::println);
        MessageDTO messageDTO = 
          new MessageDTO("1", "1", "Title", "Test Body");
        Source<JsonNode, ?> out = Source.tick(
          Duration.ofSeconds(2),
          Duration.ofSeconds(2),
          MessageConverter.messageToJsonNode(messageDTO)
        );
        return Flow.fromSinkAndSource(in, out);
    });
}

The Source#tick method takes three parameters. The first is the initial delay before the first tick is processed, and the second is the interval between successive ticks. We’ve set both values to two seconds in the above snippet. The third parameter is an object that should be returned on each tick.

Source#tick方法需要三个参数。第一个是处理第一个tick前的初始延迟,第二个是连续tick的间隔时间。在上面的片段中,我们将这两个值都设置为2秒。第三个参数是一个对象,应该在每个刻度上返回。

To see this in action, we need to modify the URL in the index action and make it point to the akkaStreamsSocket endpoint:

为了看到这个动作,我们需要修改index 动作中的URL,使其指向akkaStreamsSocket 端点。

String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);

And now refreshing the page, we’ll see a new entry every two seconds:

而现在刷新页面,我们会看到每两秒就有一个新条目。

6. Terminating the Actor

6.终止行为人

At some point, we’ll need to shut down the chat, either through a user request or through a timeout.

在某些时候,我们需要关闭聊天,要么通过用户请求,要么通过超时关闭。

6.1. Handling Actor Termination

6.1.处理行动者的终止

How do we detect when a WebSocket has been closed?

我们如何检测WebSocket何时被关闭?

Play will automatically close the WebSocket when the actor that handles the WebSocket terminates. So we can handle this scenario by implementing the Actor#postStop method:

当处理WebSocket的角色终止时,Play将自动关闭WebSocket。因此,我们可以通过实现Actor#postStop方法来处理这种情况。

@Override
public void postStop() throws Exception {
    log.info("Messenger actor stopped at {}",
      OffsetDateTime.now()
      .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}

6.2. Manually Terminating the Actor

6.2.手动终止行为者

Further, if we must stop the actor, we can send a PoisonPill to the actor. In our example application, we should be able to handle a “stop” request.

此外,如果我们必须停止这个角色,我们可以向这个角色发送一个PoisonPill。在我们的示例应用程序中,我们应该能够处理一个 “停止 “请求。

Let’s see how to do this in the onSendMessage method:

让我们看看如何在onSendMessage方法中这样做。

private void onSendMessage(JsonNode jsonNode) {
    RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
    String message = requestDTO.getMessage().toLowerCase();
    if("stop".equals(message)) {
        MessageDTO messageDTO = 
          createMessageDTO("1", "1", "Stop", "Stopping actor");
        out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf());
        self().tell(PoisonPill.getInstance(), getSelf());
    } else {
        log.info("Actor received. {}", requestDTO);
        processMessage(requestDTO);
    }
}

When we receive a message, we check if it’s a stop request. If it is, we send the PoisonPill. Otherwise, we process the request.

当我们收到一个消息时,我们检查它是否是一个停止请求。如果是,我们就发送PoisonPill。否则,我们就处理这个请求。

7. Configuration Options

7.配置选项

We can configure several options in terms of how the WebSocket should be handled. Let’s look at a few.

在如何处理WebSocket的问题上,我们可以配置几个选项。让我们看看其中几个。

7.1. WebSocket Frame Length

7.1.WebSocket帧长度

WebSocket communication involves the exchange of data frames.

WebSocket通信涉及数据帧的交换。

The WebSocket frame length is configurable. We have the option to adjust the frame length to our application requirements.

WebSocket的框架长度是可配置的。我们可以选择根据我们的应用要求来调整帧的长度。

Configuring a shorter frame length may help reduce denial of service attacks that use long data frames. We can change the frame length for the application by specifying the max length in application.conf:

配置较短的帧长度可能有助于减少使用长数据帧的拒绝服务攻击。我们可以通过在application.conf中指定最大长度来改变应用程序的帧长度。

play.server.websocket.frame.maxLength = 64k

We can also set this configuration option by specifying the max length as a command-line parameter:

我们也可以通过指定最大长度作为一个命令行参数来设置这个配置选项。

sbt -Dwebsocket.frame.maxLength=64k run

7.2. Connection Idle Timeout

7.2.连接空闲超时

By default, the actor we use to handle the WebSocket is terminated after one minute. This is because the Play server in which our application is running has a default idle timeout of 60 seconds. This means that all connections that do not receive a request in sixty seconds are closed automatically.

默认情况下,我们用于处理 WebSocket 的角色在一分钟后被终止。这是因为我们的应用程序所运行的Play服务器的默认空闲超时时间为60秒。这意味着所有在60秒内没有收到请求的连接都会自动关闭。

We can change this through configuration options. Let’s head over to our application.conf and change the server to have no idle timeout:

我们可以通过配置选项来改变这一点。让我们去看看我们的application.conf,把服务器改成没有空闲超时。

play.server.http.idleTimeout = "infinite"

Or we can pass in the option as command-line arguments:

或者我们可以将选项作为命令行参数传入。

sbt -Dhttp.idleTimeout=infinite run

We can also configure this by specifying devSettings in build.sbt.

我们也可以通过在build.sbt中指定devSettings来进行配置。

Config options specified in build.sbt are only used in development, they will be ignored in production:

build.sbt中指定的配置选项只在开发中使用,在生产中会被忽略:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"

If we re-run the application, the actor won’t terminate.

如果我们重新运行该应用程序,行为者不会终止。

We can change the value to seconds:

我们可以把这个值改为秒。

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"

We can find out more about the available configuration options in the Play Framework documentation.

我们可以在 Play Framework 文档中找到更多关于可用配置选项的信息

8. Conclusion

8.结语

In this tutorial, we implemented WebSockets in the Play Framework with Akka actors and Akka Streams.

在本教程中,我们在Play框架中用Akka actors和Akka Streams实现了WebSockets。

We then went on to look at how to use Akka actors directly and then saw how Akka Streams can be set up to handle the WebSocket connection.

我们接着看了如何直接使用Akka actors,然后看到如何设置Akka Streams来处理WebSocket连接。

On the client-side, we used JavaScript to handle our WebSocket events.

在客户端,我们使用JavaScript来处理我们的WebSocket事件。

Finally, we looked at some configuration options that we can use.

最后,我们看了一些我们可以使用的配置选项。

As usual, the source code for this tutorial is available over on GitHub.

像往常一样,本教程的源代码可以在GitHub上获得