Reactive WebSockets with Spring 5 – 使用Spring 5的反应式WebSockets

最后修改: 2018年 1月 31日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we’re going to create a quick example using the new Spring 5 WebSockets API along with reactive features provided by Spring WebFlux.

在这篇文章中,我们将使用新的Spring 5 WebSockets API以及Spring WebFlux提供的反应式功能创建一个快速的例子。

WebSocket is a well-known protocol that enables full-duplex communication between client and server, generally used in web applications where the client and server need to exchange events at high frequency and with low latency.

WebSocket是一个著名的协议,能够实现客户和服务器之间的全双工通信,一般用于客户和服务器需要以高频率和低延迟交换事件的网络应用。

Spring Framework 5 has modernized WebSockets support in the framework, adding reactive capabilities to this communication channel.

Spring Framework 5在框架中对WebSockets的支持进行了现代化处理,为这一通信渠道增加了反应性功能。

We can find more on Spring WebFlux here.

我们可以在Spring WebFlux这里找到更多信息。

2. Maven Dependencies

2.Maven的依赖性

We’re going to use the spring-boot-starters dependencies for spring-boot-integration and spring-boot-starter-webflux, currently available at Spring Milestone Repository.

我们将使用spring-boot-starters的spring-boot-integrationspring-boot-starter-webflux依赖项,目前可在Spring Milestone Repository获得。

In this example, we’re using the latest available version, 2.0.0.M7, but one should always get the latest version available in the Maven repository:

在本例中,我们使用的是最新的可用版本,即2.0.0.M7,但人们应该始终获取Maven仓库中的最新版本。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

3. WebSocket Configuration in Spring

3.Spring中的WebSocket配置

Our configuration is pretty straightforward: We’ll inject the WebSocketHandler to handle the socket session in our Spring WebSocket application.

我们的配置非常简单明了。我们将注入WebSocketHandler来处理Spring WebSocket应用程序中的套接字会话。

@Autowired
private WebSocketHandler webSocketHandler;

Furthermore, let’s create a HandlerMapping bean-annotated method that will be responsible for the mapping between requests and handler objects:

此外,让我们创建一个HandlerMapping bean-annotated方法,它将负责请求和处理程序对象之间的映射。

@Bean
public HandlerMapping webSocketHandlerMapping() {
    Map<String, WebSocketHandler> map = new HashMap<>();
    map.put("/event-emitter", webSocketHandler);

    SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
    handlerMapping.setOrder(1);
    handlerMapping.setUrlMap(map);
    return handlerMapping;
}

The URL we can connect to will be: ws://localhost:<port>/event-emitter.

我们可以连接的URL将是。ws://localhost:<port>/event-emitter.

4. WebSocket Message Handling in Spring

4.Spring中的WebSocket消息处理

Our ReactiveWebSocketHandler class will be responsible for managing the WebSocket session on the server-side.

我们的ReactiveWebSocketHandler类将负责管理服务器侧的WebSocket会话。

It implements the WebSocketHandler interface so we can override the handle method, which will be used to send the message to the WebSocket client:

它实现了WebSocketHandler接口,因此我们可以覆盖handle方法,该方法将被用于将消息发送到WebSocket客户端。

@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
    
    // private fields ...

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        return webSocketSession.send(intervalFlux
          .map(webSocketSession::textMessage))
          .and(webSocketSession.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .log());
    }
}

5. Creating a Simple Reactive WebSocket Client

5.创建一个简单的反应式WebSocket客户端

Let’s now create a Spring Reactive WebSocket client that will be able to connect and exchange information with our WebSocket server.

现在让我们创建一个Spring Reactive WebSocket客户端,它将能够连接并与我们的WebSocket服务器交换信息。

5.1. Maven Dependency

5.1.Maven的依赖性

First, the Maven dependencies.

首先是Maven的依赖性。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

Here we’re using the same spring-boot-starter-webflux used previously to set up our reactive WebSocket server application.

在这里,我们使用之前用来设置我们的反应式WebSocket服务器应用程序的spring-boot-starter-webflux。

5.2. WebSocket Client

5.2 WebSocket客户端

Now, the let’s create the ReactiveClientWebSocket class, responsible for starting the communication with the server:

现在,让我们创建ReactiveClientWebSocket类,负责启动与服务器的通信。

public class ReactiveJavaClientWebSocket {
 
    public static void main(String[] args) throws InterruptedException {
 
        WebSocketClient client = new ReactorNettyWebSocketClient();
        client.execute(
          URI.create("ws://localhost:8080/event-emitter"), 
          session -> session.send(
            Mono.just(session.textMessage("event-spring-reactive-client-websocket")))
            .thenMany(session.receive()
              .map(WebSocketMessage::getPayloadAsText)
              .log())
            .then())
            .block(Duration.ofSeconds(10L));
    }
}

In the code above we can see that we’re using the ReactorNettyWebSocketClient, which is the WebSocketClient implementation for use with Reactor Netty.

在上面的代码中,我们可以看到我们正在使用ReactorNettyWebSocketClient,它是用于Reactor Netty的WebSocketClient实现。

Additionally, the client connects to the WebSocket server through the URL ws://localhost:8080/event-emitter, establishing a session as soon as it is connected to the server.

此外,客户端通过URL ws://localhost:8080/event-emitter连接到WebSocket服务器,一旦连接到服务器就会建立一个会话。

We can also see that we are sending a message to the server (“event-spring-reactive-client-websocket“) along with the connection request.

我们还可以看到,我们正在向服务器发送一条消息(”event-spring-reactive-client-websocket“)以及连接请求。

Furthermore, the method send is invoked, expecting as a parameter a variable of type Publisher<T>, which in our case our Publisher<T> is Mono<T> and T is a simple String “event-me-from-reactive-java-client-websocket“.

此外,方法send被调用,期待一个Publisher<T>类型的变量作为参数,在我们的案例中,我们的Publisher<T>Mono<T>T是一个简单的字符串”event-me-from-reactive-java-client-websocket“。

Moreover, the thenMany(…) method expecting a Flux of type String is invoked. The receive() method gets the flux of incoming messages, which later are converted into strings.

此外,期待Flux类型StringthenMany(…) 方法被调用。receive()方法获得传入消息的通量,随后将其转换为字符串。

Finally, the block() method forces the client to disconnect from the server after the given time (10 seconds in our example).

最后,block() 方法强制客户端在给定的时间(在我们的例子中为10秒)后断开与服务器的连接。

5.3. Starting the Client

5.3.启动客户端

To run it, make sure the Reactive WebSocket Server is up and running. Then, launch the ReactiveJavaClientWebSocket class, and we can see on the sysout log the events being emitted:

要运行它,确保Reactive WebSocket服务器已经启动并运行。然后,启动ReactiveJavaClientWebSocket类,我们可以在sysout日志中看到正在发射的事件。

[reactor-http-nio-4] INFO reactor.Flux.Map.1 - 
onNext({"eventId":"6042b94f-fd02-47a1-911d-dacf97f12ba6",
"eventDt":"2018-01-11T23:29:26.900"})

We also can see in the log from our Reactive WebSocket server the message sent by the client during the connection attempt:

我们还可以从我们的Reactive WebSocket服务器的日志中看到客户端在连接尝试期间发送的消息。

[reactor-http-nio-2] reactor.Flux.Map.1: 
onNext(event-me-from-reactive-java-client)

Also, we can see the message of terminated connection after the client finished its requests (in our case, after the 10 seconds):

另外,我们可以看到在客户端完成其请求后(在我们的例子中,在10秒后),连接终止的消息。

[reactor-http-nio-2] reactor.Flux.Map.1: onComplete()

6. Creating a Browser WebSocket Client

6.创建一个浏览器WebSocket客户端

Let’s create a simple HTML/Javascript client WebSocket to consume our reactive WebSocket server application.

让我们创建一个简单的HTML/Javascript客户端WebSocket,以消费我们的反应式WebSocket服务器应用程序。

<div class="events"></div>
<script>
    var clientWebSocket = new WebSocket("ws://localhost:8080/event-emitter");
    clientWebSocket.onopen = function() {
        console.log("clientWebSocket.onopen", clientWebSocket);
        console.log("clientWebSocket.readyState", "websocketstatus");
        clientWebSocket.send("event-me-from-browser");
    }
    clientWebSocket.onclose = function(error) {
        console.log("clientWebSocket.onclose", clientWebSocket, error);
        events("Closing connection");
    }
    clientWebSocket.onerror = function(error) {
        console.log("clientWebSocket.onerror", clientWebSocket, error);
        events("An error occured");
    }
    clientWebSocket.onmessage = function(error) {
        console.log("clientWebSocket.onmessage", clientWebSocket, error);
        events(error.data);
    }
    function events(responseEvent) {
        document.querySelector(".events").innerHTML += responseEvent + "<br>";
    }
</script>

With the WebSocket server running, opening this HTML file in a browser (e.g.: Chrome, Internet Explorer, Mozilla Firefox etc.), we should see the events being printed on the screen, with a delay of 1 second per event, as defined in our WebSocket server.

随着WebSocket服务器的运行,在浏览器中打开这个HTML文件(例如:Chrome、Internet Explorer、Mozilla Firefox等),我们应该看到事件被打印在屏幕上,每个事件的延迟时间为1秒,这是我们的WebSocket服务器中定义的。

{"eventId":"c25975de-6775-4b0b-b974-b396847878e6","eventDt":"2018-01-11T23:56:09.780"}
{"eventId":"ac74170b-1f71-49d3-8737-b3f9a8a352f9","eventDt":"2018-01-11T23:56:09.781"}
{"eventId":"40d8f305-f252-4c14-86d7-ed134d3e10c6","eventDt":"2018-01-11T23:56:09.782"}

7. Conclusion

7.结论

Here we’ve presented an example of how to create a WebSocket communication between server and client by using Spring 5 Framework, implementing the new reactive features provided by Spring Webflux.

这里我们介绍了一个例子,说明如何通过使用Spring 5框架在服务器和客户端之间创建WebSocket通信,实现Spring Webflux提供的新的反应式功能。

As always, the full example can be found in our GitHub repository.

一如既往,完整的例子可以在我们的GitHub 仓库中找到。