Introduction to Spring Cloud Stream – Spring Cloud Stream简介

最后修改: 2017年 11月 18日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Spring Cloud Stream is a framework built on top of Spring Boot and Spring Integration that helps in creating event-driven or message-driven microservices.

Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务

In this article, we’ll introduce concepts and constructs of Spring Cloud Stream with some simple examples.

在这篇文章中,我们将通过一些简单的例子介绍Spring Cloud Stream的概念和结构。

2. Maven Dependencies

2.Maven的依赖性

To get started, we’ll need to add the Spring Cloud Starter Stream with the broker RabbitMQ Maven dependency as messaging-middleware to our pom.xml:

为了开始工作,我们需要将Spring Cloud Starter Stream with the broker RabbitMQ Maven依赖项作为消息传递中间件添加到我们的pom.xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    <version>3.1.3</version>
</dependency>

And we’ll add the module dependency from Maven Central to enable JUnit support as well:

我们将添加来自Maven Central的模块依赖性,以便同时启用JUnit支持。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-support</artifactId>
    <version>3.1.3</version>
    <scope>test</scope>
</dependency>

3. Main Concepts

3.主要概念

Microservices architecture follows the “smart endpoints and dumb pipes” principle. Communication between endpoints is driven by messaging-middleware parties like RabbitMQ or Apache Kafka. Services communicate by publishing domain events via these endpoints or channels.

微服务架构遵循”智能端点和哑巴管道“原则。端点之间的通信由RabbitMQ或Apache Kafka等消息传递中间件方驱动。服务通过这些端点或通道发布领域事件进行通信

Let’s walk through the concepts that make up the Spring Cloud Stream framework, along with the essential paradigms that we must be aware of to build message-driven services.

让我们来看看构成Spring Cloud Stream框架的概念,以及我们在构建消息驱动服务时必须注意的基本范式。

3.1. Constructs

3.1.构造

Let’s look at a simple service in Spring Cloud Stream that listens to input binding and sends a response to the output binding:

让我们看看Spring Cloud Stream中的一个简单服务,它监听input绑定并向output绑定发送响应。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public LogMessage enrichLogMessage(LogMessage log) {
        return new LogMessage(String.format("[1]: %s", log.getMessage()));
    }
}

The annotation @EnableBinding configures the application to bind the channels INPUT and OUTPUT defined within the interface Processor. Both channels are bindings that can be configured to use a concrete messaging-middleware or binder.

注解@EnableBinding将应用程序配置为绑定INPUTOUTPUT接口Processor内定义的通道。这两个通道都是绑定的,可以配置为使用具体的消息传递中间件或绑定器。

Let’s take a look at the definition of all these concepts:

让我们看一下所有这些概念的定义。

  • Bindings — a collection of interfaces that identify the input and output channels declaratively
  • Binder — messaging-middleware implementation such as Kafka or RabbitMQ
  • Channel — represents the communication pipe between messaging-middleware and the application
  • StreamListeners — message-handling methods in beans that will be automatically invoked on a message from the channel after the MessageConverter does the serialization/deserialization between middleware-specific events and domain object types / POJOs
  • Message Schemas — used for serialization and deserialization of messages, these schemas can be statically read from a location or loaded dynamically, supporting the evolution of domain object types

3.2. Communication Patterns

3.2.交流模式

Messages designated to destinations are delivered by the Publish-Subscribe messaging pattern. Publishers categorize messages into topics, each identified by a name. Subscribers express interest in one or more topics. The middleware filters the messages, delivering those of the interesting topics to the subscribers.

指定给目的地的消息是通过发布-订阅消息模式交付的。发布者将消息归类为主题,每个主题由一个名称标识。订阅者表达对一个或多个主题的兴趣。中间件对消息进行过滤,将那些感兴趣的主题传递给订阅者。

Now, the subscribers could be grouped. A consumer group is a set of subscribers or consumers, identified by a group id, within which messages from a topic or topic’s partition are delivered in a load-balanced manner.

现在,订户可以被分组。一个消费者组是一组由组id标识的订阅者或消费者,在这个组中,来自主题或主题的分区的消息以负载平衡的方式被传递。

4. Programming Model

4.编程模型

This section describes the basics of building Spring Cloud Stream applications.

本节介绍了构建Spring Cloud Stream应用程序的基础知识。

4.1. Functional Testing

4.1.功能测试

The test support is a binder implementation that allows interacting with the channels and inspecting messages.

测试支持是一个绑定器的实现,允许与通道进行交互并检查消息。

Let’s send a message to the above enrichLogMessage service and check whether the response contains the text “[1]: “ at the beginning of the message:

让我们向上述enrichLogMessage服务发送一条消息,并检查响应是否包含文本“[1]:”在消息的开头。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationTests {

    @Autowired
    private Processor pipe;

    @Autowired
    private MessageCollector messageCollector;

    @Test
    public void whenSendMessage_thenResponseShouldUpdateText() {
        pipe.input()
          .send(MessageBuilder.withPayload(new LogMessage("This is my message"))
          .build());

        Object payload = messageCollector.forChannel(pipe.output())
          .poll()
          .getPayload();

        assertEquals("[1]: This is my message", payload.toString());
    }
}

4.2. Custom Channels

4.2.自定义通道

In the above example, we used the Processor interface provided by Spring Cloud, which has only one input and one output channel.

在上述例子中,我们使用了Spring Cloud提供的Processor接口,它只有一个输入和一个输出通道。

If we need something different, like one input and two output channels, we can create a custom processor:

如果我们需要一些不同的东西,比如一个输入和两个输出通道,我们可以创建一个自定义的处理器。

public interface MyProcessor {
    String INPUT = "myInput";

    @Input
    SubscribableChannel myInput();

    @Output("myOutput")
    MessageChannel anOutput();

    @Output
    MessageChannel anotherOutput();
}

Spring will provide the proper implementation of this interface for us. The channel names can be set using annotations like in @Output(“myOutput”).

Spring将为我们提供这个接口的正确实现。通道名称可以使用注解来设置,比如在@Output(“myOutput”)

Otherwise, Spring will use the method names as the channel names. Therefore, we’ve got three channels called myInput, myOutput, and anotherOutput.

否则,Spring将使用方法名作为通道名。因此,我们有三个通道叫做myInputmyOutputanotherOutput

Now, let’s imagine we want to route the messages to one output if the value is less than 10 and into another output is the value is greater than or equal to 10:

现在,让我们设想一下,如果数值小于10,我们要将信息路由到一个输出,如果数值大于或等于10,则路由到另一个输出。

@Autowired
private MyProcessor processor;

@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
    if (val < 10) {
        processor.anOutput().send(message(val));
    } else {
        processor.anotherOutput().send(message(val));
    }
}

private static final <T> Message<T> message(T val) {
    return MessageBuilder.withPayload(val).build();
}

4.3. Conditional Dispatching

4.3.条件性调度

Using the @StreamListener annotation, we also can filter the messages we expect in the consumer using any condition that we define with SpEL expressions.

使用@StreamListener注解,我们还可以使用我们用SpEL表达式定义的任何条件,过滤我们在消费者中期待的消息

As an example, we could use conditional dispatching as another approach to route messages into different outputs:

作为一个例子,我们可以使用条件调度作为另一种方法,将消息发送到不同的输出。

@Autowired
private MyProcessor processor;

@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
    processor.anOutput().send(message(val));
}

@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
    processor.anotherOutput().send(message(val));
}

The only limitation of this approach is that these methods must not return a value.

这种方法的唯一限制是,这些方法必须不返回一个值。

5. Setup

5 设置

Let’s set up the application that will process the message from the RabbitMQ broker.

让我们设置应用程序,它将处理来自 RabbitMQ 代理的消息。

5.1. Binder Configuration

5.1.粘合剂配置

We can configure our application to use the default binder implementation via META-INF/spring.binders:

我们可以通过META-INF/spring.binders将我们的应用程序配置为使用默认的绑定器实现。

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Or we can add the binder library for RabbitMQ to the classpath by including this dependency:

或者我们可以通过包括这个依赖性,将 RabbitMQ 的 binder 库添加到 classpath。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    <version>1.3.0.RELEASE</version>
</dependency>

If no binder implementation is provided, Spring will use direct message communication between the channels.

如果没有提供绑定器实现,Spring将在通道之间使用直接的消息通信。

5.2. RabbitMQ Configuration

5.2.RabbitMQ 配置

To configure the example in section 3.1 to use the RabbitMQ binder, we need to update the application.yml located at src/main/resources:

为了配置第 3.1 节中的示例以使用 RabbitMQ 粘合剂,我们需要更新位于 src/main/resourcesapplication.yml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
        output:
          destination: queue.pretty.log.messages
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host>
                port: 5672
                username: <username>
                password: <password>
                virtual-host: /

The input binding will use the exchange called queue.log.messages, and the output binding will use the exchange queue.pretty.log.messages. Both bindings will use the binder called local_rabbit.

input绑定将使用名为queue.log.messages的交换,而output绑定将使用queue.pretty.log.messages交换。两个绑定都将使用名为local_rabbit的绑定器。

Note that we don’t need to create the RabbitMQ exchanges or queues in advance. When running the application, both exchanges are automatically created.

请注意,我们不需要提前创建 RabbitMQ 交换机或队列。在运行应用程序时,两个交换器都会自动创建

To test the application, we can use the RabbitMQ management site to publish a message. In the Publish Message panel of the exchange queue.log.messages, we need to enter the request in JSON format.

为了测试应用程序,我们可以使用 RabbitMQ 管理站点来发布消息。在交易所 queue.log.messagesPublish Message 面板中,我们需要以 JSON 格式输入请求。

5.3. Customizing Message Conversion

5.3.自定义消息转换

Spring Cloud Stream allows us to apply message conversion for specific content types. In the above example, instead of using JSON format, we want to provide plain text.

Spring Cloud Stream允许我们为特定的内容类型应用消息转换。在上面的例子中,我们不想使用JSON格式,而是想提供纯文本。

To do this, we’ll to apply a custom transformation to LogMessage using a MessageConverter:

要做到这一点,我们将使用MessageConverterLogMessage进行自定义转换。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    //...

    @Bean
    public MessageConverter providesTextPlainMessageConverter() {
        return new TextPlainMessageConverter();
    }

    //...
}
public class TextPlainMessageConverter extends AbstractMessageConverter {

    public TextPlainMessageConverter() {
        super(new MimeType("text", "plain"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (LogMessage.class == clazz);
    }

    @Override
    protected Object convertFromInternal(Message<?> message, 
        Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        String text = payload instanceof String 
          ? (String) payload 
          : new String((byte[]) payload);
        return new LogMessage(text);
    }
}

After applying these changes, going back to the Publish Message panel, if we set the header “contentTypes” to “text/plain” and the payload to “Hello World“, it should work as before.

应用这些变化后,回到发布消息面板,如果我们将标题”contentTypes“设置为”text/plain“,将有效载荷设置为”Hello World“,它应该像以前一样工作。

5.4. Consumer Groups

5.4.消费者团体

When running multiple instances of our application, every time there is a new message in an input channel, all subscribers will be notified.

当我们的应用程序运行多个实例时,每当输入通道中有一个新消息时,所有订阅者都会得到通知

Most of the time, we need the message to be processed only once. Spring Cloud Stream implements this behavior via consumer groups.

大多数情况下,我们需要消息只被处理一次。Spring Cloud Stream通过消费者组实现了这种行为。

To enable this behavior, each consumer binding can use the spring.cloud.stream.bindings.<CHANNEL>.group property to specify a group name:

为了启用这种行为,每个消费者绑定可以使用spring.cloud.stream.bindings.<CHANNEL>.group属性来指定一个组名。

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
          group: logMessageConsumers
          ...

6. Message-Driven Microservices

6.消息驱动的微服务

In this section, we introduce all the required features for running our Spring Cloud Stream applications in a microservices context.

在本节中,我们将介绍在微服务背景下运行我们的Spring Cloud Stream应用程序所需的所有功能。

6.1. Scaling Up

6.1.扩大规模

When multiple applications are running, it’s important to ensure the data is split properly across consumers. To do so, Spring Cloud Stream provides two properties:

当多个应用程序运行时,确保数据在消费者之间被正确分割是很重要的。为了做到这一点,Spring Cloud Stream提供了两个属性。

  • spring.cloud.stream.instanceCount — number of running applications
  • spring.cloud.stream.instanceIndex — index of the current application

For example, if we’ve deployed two instances of the above MyLoggerServiceApplication application, the property spring.cloud.stream.instanceCount should be 2 for both applications, and the property spring.cloud.stream.instanceIndex should be 0 and 1 respectively.

例如,如果我们部署了上述MyLoggerServiceApplication应用程序的两个实例,则两个应用程序的属性spring.cloud.instanceCount应该是2,而属性spring.cloud.instanceIndex应该分别为0和1。

These properties are automatically set if we deploy the Spring Cloud Stream applications using Spring Data Flow as described in this article.

如果我们按照本文中所述,使用Spring Data Flow部署Spring Cloud Stream应用程序,这些属性将被自动设置。

6.2. Partitioning

6.2.分区

The domain events could be Partitioned messages. This helps when we are scaling up the storage and improving application performance.

域事件可以是分区的消息。这有助于我们在扩大存储和提高应用性能时

The domain event usually has a partition key so that it ends up in the same partition with related messages.

域事件通常有一个分区键,以便它最终与相关的信息出现在同一个分区。

Let’s say that we want the log messages to be partitioned by the first letter in the message, which would be the partition key, and grouped into two partitions.

假设我们想让日志信息按照信息中的第一个字母来分区,这将是分区密钥,并将其分为两个分区。

There would be one partition for the log messages that start with A-M and another partition for N-Z. This can be configured using two properties:

将有一个分区用于存放以A-M开头的日志信息,另一个分区用于存放N-Z.这可以通过两个属性来配置。

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression — the expression to partition the payloads
  • spring.cloud.stream.bindings.output.producer.partitionCount — the number of groups

Sometimes the expression to partition is too complex to write it in only one line. For these cases, we can write our custom partition strategy using the property spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

有时要分区的表达式太复杂,无法只用一行来写。对于这些情况,我们可以使用属性spring.cloud.bindings.output.producer.partitionKeyExtractorClass来写我们的自定义分区策略。

6.3. Health Indicator

6.3.健康指标

In a microservices context, we also need to detect when a service is down or starts failing. Spring Cloud Stream provides the property management.health.binders.enabled to enable the health indicators for binders.

在微服务背景下,我们还需要检测服务何时停机或开始失效。Spring Cloud Stream提供了management.health.binders.enabled属性,以启用binders的健康指标。

When running the application, we can query the health status at http://<host>:<port>/health.

当运行应用程序时,我们可以在http://<host>:<port>/health查询健康状态。

7. Conclusion

7.结论

In this tutorial, we presented the main concepts of Spring Cloud Stream and showed how to use it through some simple examples over RabbitMQ. More info about Spring Cloud Stream can be found here.

在本教程中,我们介绍了Spring Cloud Stream的主要概念,并通过一些简单的例子展示了如何通过RabbitMQ使用它。有关 Spring Cloud Stream 的更多信息可在此处找到。

The source code for this article can be found over on GitHub.

这篇文章的源代码可以在GitHub上找到over