Server-Sent Events in Spring – Spring里的服务器发送事件

最后修改: 2018年 9月 8日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll see how we can implement Server-Sent-Events-based APIs with Spring.

在本教程中,我们将看到如何用Spring实现基于Server-Sent-Events的API。

Simply put, Server-Sent-Events, or SSE for short, is an HTTP standard that allows a web application to handle a unidirectional event stream and receive updates whenever server emits data.

简而言之,Server-Sent-Events(简称SSE)是一个HTTP标准,它允许网络应用程序处理一个单向的事件流,并在服务器发出数据的时候接收更新。

Spring 4.2 version already supported it, but starting with Spring 5, we now have a more idiomatic and convenient way to handle it.

Spring 4.2版本已经支持它,但从Spring 5开始,我们现在有一个更成文、更方便的处理方式

2. SSE with Spring 5 Webflux

2.带有Spring 5 Webflux的SSE

To achieve this, we can make use of implementations such as the Flux class provided by the Reactor library, or potentially the ServerSentEvent entity, which gives us control over the events metadata.

为了实现这一点,我们可以利用诸如Reactor库提供的Flux类的实现,或者潜在的ServerSentEvent实体,它让我们对事件元数据进行控制。

2.1. Stream Events Using Flux

2.1.使用Flux的流事件

Flux is a reactive representation of a stream of events – it’s handled differently based on the specified request or response media type.

Flux是事件流的反应式表示 – 它根据指定的请求或响应媒体类型进行不同的处理。

To create an SSE streaming endpoint, we’ll have to follow the W3C specifications and designate its MIME type as text/event-stream:

要创建一个SSE流媒体端点,我们必须遵循W3C规范并将其MIME类型指定为text/event-stream

@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> "Flux - " + LocalTime.now().toString());
}

The interval method creates a Flux that emits long values incrementally. Then we map those values to our desired output.

interval方法创建了一个Flux,该方法逐步发出long值。然后我们将这些值映射到我们想要的输出。

Let’s start our application and try it out by browsing the endpoint then.

让我们启动我们的应用程序,然后通过浏览端点来试试。

We’ll see how the browser reacts to the events being pushed second by second by the server. For more information about Flux and the Reactor Core, we can check out this post.

我们将看到浏览器对服务器一秒一秒推送的事件的反应。关于FluxReactor Core的更多信息,我们可以查看这个帖子

2.2. Making Use of the ServerSentEvent Element

2.2.使用ServerSentEvent元素

We’ll now wrap our output String into a ServerSentSevent object, and examine the benefits of doing this:

现在我们将把我们的输出String包装成一个ServerSentSevent对象,并检查这样做的好处。

@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> ServerSentEvent.<String> builder()
        .id(String.valueOf(sequence))
          .event("periodic-event")
          .data("SSE - " + LocalTime.now().toString())
          .build());
}

As we can appreciate, there’re a couple of benefits of using the ServerSentEvent entity:

我们可以理解,使用ServerSentEvent实体有几个好处

  1. we can handle the events metadata, which we’d need in a real case scenario
  2. we can ignore “text/event-stream” media type declaration

In this case, we specified an id, an event name, and, most importantly, the actual data of the event.

在这种情况下,我们指定了一个id,一个event name,以及最重要的,事件的实际data

Also, we could’ve added a comments attribute, and a retry value, which will specify the reconnection time to be used when trying to send the event.

此外,我们还可以添加一个comments属性和一个retry值,这将指定在尝试发送事件时要使用的重新连接时间。

2.3. Consuming the Server-Sent Events with a WebClient

2.3.用WebClient来获取服务器发送的事件

Now let’s consume our event stream with a WebClient.:

现在让我们用一个WebClient来消费我们的事件流。

public void consumeServerSentEvent() {
    WebClient client = WebClient.create("http://localhost:8080/sse-server");
    ParameterizedTypeReference<ServerSentEvent<String>> type
     = new ParameterizedTypeReference<ServerSentEvent<String>>() {};

    Flux<ServerSentEvent<String>> eventStream = client.get()
      .uri("/stream-sse")
      .retrieve()
      .bodyToFlux(type);

    eventStream.subscribe(
      content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
        LocalTime.now(), content.event(), content.id(), content.data()),
      error -> logger.error("Error receiving SSE: {}", error),
      () -> logger.info("Completed!!!"));
}

The subscribe method allows us to indicate how we’ll proceed when we receive an event successfully, when an error occurs, and when the streaming is completed.

subscribe方法允许我们表明,当我们成功收到一个事件时,当发生错误时,以及当流媒体完成时,我们将如何处理。

In our example, we used the retrieve method, which is a simple and straightforward way of getting the response body.

在我们的例子中,我们使用了retrieve方法,这是一种简单直接的获取响应体的方法。

This method automatically throws a WebClientResponseException if we receive a 4xx or 5xx response unless we handle the scenarios adding an onStatus statement.

如果我们收到4xx或5xx响应,这个方法会自动抛出一个WebClientResponseException,除非我们在处理这些情况时加入一个onStatus语句。

On the other hand, we could’ve used the exchange method as well, which provides access to the ClientResponse and also doesn’t error-signal on failing responses.

另一方面,我们也可以使用exchange方法,它提供了对ClientResponse的访问,也不会对失败的响应发出错误信号。

We have to keep into consideration that we can bypass the ServerSentEvent wrapper if we don’t need the event metadata.

我们必须考虑到,如果我们不需要事件的元数据,我们可以绕过ServerSentEvent包装器。

3. SSE Streaming in Spring MVC

3.Spring MVC中的SSE流

As we said, the SSE specification was supported since Spring 4.2, when the SseEmitter class was introduced.

正如我们所说,从Spring 4.2开始就支持SSE规范了,当时引入了SseEmitter类。

In simple terms, we’ll define an ExecutorService, a thread where the SseEmitter will do its work pushing data, and return the emitter instance, keeping the connection open in this manner:

简单地说,我们将定义一个ExecutorService,这是一个线程,SseEmitter将在其中进行推送数据的工作,并返回发射器实例,以这种方式保持连接开放:

@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
    SseEmitter emitter = new SseEmitter();
    ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
    sseMvcExecutor.execute(() -> {
        try {
            for (int i = 0; true; i++) {
                SseEventBuilder event = SseEmitter.event()
                  .data("SSE MVC - " + LocalTime.now().toString())
                  .id(String.valueOf(i))
                  .name("sse event - mvc");
                emitter.send(event);
                Thread.sleep(1000);
            }
        } catch (Exception ex) {
            emitter.completeWithError(ex);
        }
    });
    return emitter;
}

Always make sure to pick the right ExecutorService for your use-case scenario.

始终确保为您的使用场景选择正确的ExecutorService

We can learn more about SSE in Spring MVC and have a look at other examples by reading this interesting tutorial.

我们可以通过阅读这个有趣的教程,了解更多关于Spring MVC中的SSE,并看看其他例子。

4. Understanding Server-Sent Events

4.了解服务器发送的事件

Now that we know how to implement SSE endpoints, let’s try to go a little bit deeper by understanding some underlying concepts.

现在我们知道了如何实现SSE端点,让我们试着通过理解一些基本概念来深入了解一下。

An SSE is a specification adopted by most browsers to allow streaming events unidirectionally at any time.

SSE是大多数浏览器采用的一种规范,允许在任何时候单向地流传事件。

The ‘events’ are just a stream of UTF-8 encoded text data that follow the format defined by the specification.

事件 “只是一个UTF-8编码的文本数据流,遵循规范所定义的格式。

This format consists of a series of key-value elements (id, retry, data and event, which indicates the name) separated by line breaks.

这种格式由一系列的键值元素(id、重试、数据和表示名称的事件)组成,用换行符隔开。

Comments are supported as well.

评论也得到了支持。

The spec doesn’t restrict the data payload format in any way; we can use a simple String or a more complex JSON or XML structure.

该规范没有以任何方式限制数据有效载荷的格式;我们可以使用简单的String或更复杂的JSON或XML结构。

One last point we have to take into consideration is the difference between using SSE streaming and WebSockets.

我们必须考虑的最后一点是使用SSE流和WebSockets之间的区别。

While WebSockets offer full-duplex (bi-directional) communication between the server and the client, while SSE uses uni-directional communication.

虽然WebSockets在服务器和客户端之间提供全双工(双向)通信,而SSE使用单向通信。

Also, WebSockets isn’t an HTTP protocol and, opposite to SSE, it doesn’t offer error-handling standards.

另外,WebSockets并不是一个HTTP协议,与SSE相反,它不提供错误处理标准。

5. Conclusion

5.总结

To sum up, in this article we’ve learned the main concepts of SSE streaming, which is undoubtedly a great resource that will let us create next-generation systems.

总而言之,在这篇文章中,我们已经了解了SSE流的主要概念,这无疑是一个伟大的资源,将让我们创建下一代的系统。

We are now in an excellent position to understand what is happening under the hood when we use this protocol.

我们现在处于一个很好的位置,可以理解当我们使用这个协议时,在引擎盖下发生了什么。

Furthermore, we complemented the theory with some simple examples, which can be found in our Github repository.

此外,我们用一些简单的例子来补充理论,这些例子可以在我们的Github资源库中找到。