Getting Started with Stream Processing with Spring Cloud Data Flow – 使用Spring Cloud Data Flow进行流处理的入门教程

最后修改: 2016年 9月 1日

1. Introduction

1.介绍

Spring Cloud Data Flow is a cloud-native programming and operating model for composable data microservices.

Spring Cloud Data Flow是一个用于可组合数据微服务的云原生编程和操作模型。

With Spring Cloud Data Flow, developers can create and orchestrate data pipelines for common use cases such as data ingest, real-time analytics, and data import/export.

通过Spring Cloud Data Flow,开发人员可以为数据摄取、实时分析和数据导入/导出等常见用例创建和协调数据管道。

This data pipelines come in two flavors, streaming and batch data pipelines.

这种数据管道有两种类型,流式和批处理数据管道。

In the first case, an unbounded amount of data is consumed or produced via messaging middleware. While in the second case the short-lived task processes a finite set of data and then terminate.

在第一种情况下,通过消息中间件消耗或产生无限制的数据量。而在第二种情况下,短命的任务处理一组有限的数据,然后终止。

This article will focus on streaming processing.

本文将重点讨论流处理。

2. Architectural Overview

2.建筑概述

The key components these type of architecture are Applications, the Data Flow Server, and the target runtime.

这些类型的架构的关键组件是应用程序数据流服务器和目标运行时。

Also in addition to these key components, we also usually have a Data Flow Shell and a message broker within the architecture.

另外除了这些关键组件外,我们在架构内通常还有一个数据流外壳和一个消息代理

Let’s see all these components in more detail.

让我们更详细地看看所有这些组件。

2.1. Applications

2.1.应用

Typically, a streaming data pipeline includes consuming events from external systems, data processing, and polyglot persistence. These phases are commonly referred to as Source, Processor, and Sink in Spring Cloud terminology:

通常情况下,流式数据管道包括从外部系统消费事件、数据处理和多角化持久化。在Spring Cloud术语中,这些阶段通常被称为SourceProcessorSink

  • Source: is the application that consumes events
  • Processor: consumes data from the Source, does some processing on it, and emits the processed data to the next application in the pipeline
  • Sink: either consumes from a Source or Processor and writes the data to the desired persistence layer

These applications can be packaged in two ways:

这些应用可以通过两种方式打包。

  • Spring Boot uber-jar that is hosted in a maven repository, file, http or any other Spring resource implementation (this method will be used in this article)
  • Docker

Many sources, processor, and sink applications for common use-cases (e.g. jdbc, hdfs, http, router) are already provided and ready to use by the Spring Cloud Data Flow team.

Spring Cloud Data Flow团队已经为常见的使用案例(如jdbc、hdfs、http、router)提供了许多源、处理器和汇的应用程序,并可随时使用。

2.2. Runtime

2.2.运行时间

Also, a runtime is needed for these applications to execute. The supported runtimes are:

另外,这些应用程序的执行需要一个运行时间。支持的运行时间有

  • Cloud Foundry
  • Apache YARN
  • Kubernetes
  • Apache Mesos
  • Local Server for development (wich will be used in this article)

2.3. Data Flow Server

2.3.数据流服务器

The component that is responsible for deploying applications to a runtime is the Data Flow Server. There is a Data Flow Server executable jar provided for each of the target runtimes.

负责将应用程序部署到运行时的组件是Data Flow Server。每个目标运行时都有一个Data Flow Server可执行jar。

The Data Flow Server is responsible for interpreting:

数据流服务器负责解释。

  • A stream DSL that describes the logical flow of data through multiple applications.
  • A deployment manifest that describes the mapping of applications onto the runtime.

2.4. Data Flow Shell

2.4.数据流外壳

The Data Flow Shell is a client for the Data Flow Server. The shell allows us to perform the DSL command needed to interact with the server.

Data Flow Shell是Data Flow服务器的客户端。 Shell允许我们执行与服务器交互所需的DSL命令。

As an example, the DSL to describe the flow of data from an http source to a jdbc sink would be written as “http | jdbc”. These names in the DSL are registered with the Data Flow Server and map onto application artifacts that can be hosted in Maven or Docker repositories.

例如,描述从http源到jdbc汇的数据流的DSL将被写成 “http | jdbc”。DSL中的这些名称在数据流服务器上注册,并映射到可以托管在Maven或Docker仓库的应用工件上。

Spring also offer a graphical interface, named Flo, for creating and monitoring streaming data pipelines. However, its use is outside the discussion of this article.

Spring还提供了一个图形界面,名为Flo,用于创建和监控流式数据管道。然而,它的使用不在本文的讨论范围内。

2.5. Message Broker

2.5.消息代理

As we’ve seen in the example of the previous section, we have used the pipe symbol into the definition of the flow of data. The pipe symbol represents the communication between the two applications via messaging middleware.

正如我们所看到的 在上一节的例子中,我们已经将管道符号用于数据流的定义。管道符号代表了两个应用程序之间通过消息传递中间件进行的通信。

This means that we need a message broker up and running in the target environment.

这意味着我们需要一个消息代理在目标环境中启动和运行。

The two messaging middleware brokers that are supported are:

支持的两个消息传递中间件经纪商是。

  • Apache Kafka
  • RabbitMQ

And so, now that we have an overview of the architectural components – it’s time to build our first stream processing pipeline.

因此,现在我们对架构组件有了一个概述–是时候建立我们的第一个流处理管道了。

3. Install a Message Broker

3.安装一个消息代理

As we have seen, the applications in the pipeline need a messaging middleware to communicate. For the purpose of this article, we’ll go with RabbitMQ.

正如我们所看到的,管道中的应用程序需要一个消息传递中间件来进行通信。为了本文的目的,我们将使用RabbitMQ

For the full details of the installation, you can follow the instruction on the official site.

关于安装的全部细节,你可以按照官方网站上的指示进行。

4. The Local Data Flow Server

4.本地数据流服务器

To speed up the process of generating our applications, we’ll use Spring Initializr; with its help, we can obtain our Spring Boot applications in a few minutes.

为了加快生成应用程序的过程,我们将使用Spring Initializr;在其帮助下,我们可以在几分钟内获得我们的Spring Boot应用程序。

After reaching the website, simply choose a Group and an Artifact name.

到达网站后,只需选择一个和一个艺术品名称。

Once this is done, click on the button Generate Project to start the download of the Maven artifact.

完成后,点击Generate Project按钮,开始下载Maven构件。

init-1024x491

After the download is completed, unzip the project and import it as a Maven project in your IDE of choice.

下载完成后,解压该项目并将其作为Maven项目导入你选择的IDE中。

Let’s add a Maven dependency to the project. As we’ll need Dataflow Local Server libraries, let’s add the spring-cloud-starter-dataflow-server-local dependency:

让我们为项目添加一个Maven依赖项。由于我们需要Dataflow本地服务器库,让我们添加spring-cloud-starter-dataflow-server-local>依赖项。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
</dependency>

Now we need to annotate the Spring Boot main class with @EnableDataFlowServer annotation:

现在我们需要用@EnableDataFlowServer注解来注释Spring Boot主类。

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowServerApplication.class, args);
    }
}

That’s all. Our Local Data Flow Server is ready to be executed:

这就是全部。我们的本地数据流服务器已经准备好被执行。

mvn spring-boot:run

The application will boot up on port 9393.

该应用程序将在9393端口启动。

5. The Data Flow Shell

5.数据流外壳

Again, go to the Spring Initializr and choose a Group and Artifact name.

再次,进入Spring Initializr,选择一个GroupArtifact名称。

Once we’ve downloaded and imported the project, let’s add a spring-cloud-dataflow-shell dependency:

一旦我们下载并导入该项目,让我们添加一个spring-cloud-dataflow-shell依赖项。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dataflow-shell</artifactId>
</dependency>

Now we need to add the @EnableDataFlowShell annotation to the Spring Boot main class:

现在我们需要将@EnableDataFlowShell注解添加到Spring Boot主类。

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

We can now run the shell:

我们现在可以运行shell了。

mvn spring-boot:run

After the shell is running, we can type the help command in the prompt to see a complete list of command that we can perform.

在shell运行后,我们可以在提示符下输入help命令来查看我们可以执行的完整命令列表。

6. The Source Application

6.源应用程序</strong

Similarly, on Initializr, we’ll now create a simple application and add a Stream Rabbit dependency called spring-cloud-starter-stream-rabbit:

同样,在Initializr上,我们现在将创建一个简单的应用程序,并添加一个Stream Rabbit依赖,称为spring-cloud-starter-stream-rabbit:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

We’ll then add the @EnableBinding(Source.class) annotation to the Spring Boot main class:

然后我们将把@EnableBinding(Source.class)注解添加到Spring Boot主类。

@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowTimeSourceApplication.class, args);
    }
}

Now we need to define the source of the data that must be processed. This source could be any potentially endless workload (internet-of-things sensor data, 24/7 event processing, online transaction data ingest).

现在我们需要定义必须处理的数据源。这个来源可以是任何潜在的无尽的工作负载(物联网传感器数据、24/7事件处理、在线交易数据摄取)。

In our sample application, we produce one event (for simplicity a new timestamp) every 10 seconds with a Poller.

在我们的示例应用程序中,我们用一个Poller每10秒产生一个事件(为简单起见,一个新的时间戳)。

The @InboundChannelAdapter annotation sends a message to the source’s output channel, using the return value as the payload of the message:

@InboundChannelAdapter注解向源的输出通道发送一个消息,使用返回值作为消息的有效载荷。

@Bean
@InboundChannelAdapter(
  value = Source.OUTPUT, 
  poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
    return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}

Our data source is ready.

我们的数据源已经准备好了。

7. The Processor Application

7.处理器的应用

Next- we’ll create an application and add a Stream Rabbit dependency.

接下来,我们将创建一个应用程序并添加一个Stream Rabbit依赖。

We’ll then add the @EnableBinding(Processor.class) annotation to the Spring Boot main class:

然后我们将把@EnableBinding(Processor.class)注解添加到Spring Boot主类。

@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowTimeProcessorApplication.class, args);
    }
}

Next, we need to define a method to process the data that coming from the source application.

接下来,我们需要定义一个方法来处理来自源应用程序的数据。

To define a transformer, we need to annotate this method with @Transformer annotation:

为了定义一个转化器,我们需要用@Transformer注解来注释这个方法。

@Transformer(inputChannel = Processor.INPUT, 
  outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {

    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
    String date = dateFormat.format(timestamp);
    return date;
}

It converts a timestamp from the ‘input’ channel to a formatted date which will be sent to the ‘output’ channel.

它将 “输入 “通道的时间戳转换为格式化的日期,将被发送到 “输出 “通道。

8. The Sink Application

8.水槽的应用

The last application to create is the Sink application.

最后一个要创建的应用程序是水槽应用程序

Again, go to the Spring Initializr and choose a Group, an Artifact name. After downloading the project let’s add a Stream Rabbit dependency.

再次,进入Spring Initializr并选择一个Group,一个Artifact名称。下载项目后,让我们添加一个Stream Rabbit依赖。

Then add the @EnableBinding(Sink.class) annotation to the Spring Boot main class:

然后将 @EnableBinding(Sink.class)注解添加到Spring Boot主类。

@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {

    public static void main(String[] args) {
	SpringApplication.run(
          SpringDataFlowLoggingSinkApplication.class, args);
    }
}

Now we need a method to intercept the messages coming from the processor application.

现在我们需要一个方法来拦截来自处理器应用程序的消息。

To do this, we need to add the @StreamListener(Sink.INPUT) annotation to our method:

要做到这一点,我们需要在我们的方法中添加@StreamListener(Sink.INPUT)注解。

@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
    logger.info("Received: " + date);
}

The method simply prints the timestamp transformed in a formatted date to a log file.

该方法只是将转化为格式化日期的时间戳打印到一个日志文件中。

9. Register a Stream App

9.注册一个流媒体应用程序

The Spring Cloud Data Flow Shell allow us to Register a Stream App with the App Registry using the app register command.

Spring Cloud Data Flow Shell允许我们使用app register命令在App Registry注册一个流应用。

We must provide a unique name, application type, and a URI that can be resolved to the app artifact. For the type, specify “source“, “processor“, or “sink“.

我们必须提供一个唯一的名称、应用程序类型和一个可以解析到应用程序工件的URI。对于类型,指定”source“、”processor“、或”sink“。

When providing a URI with the maven scheme, the format should conform to the following:

在提供带有maven方案的URI时,其格式应符合以下要求。

maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>

To register the Source, Processor and Sink applications previously created, go to the Spring Cloud Data Flow Shell and issue the following commands from the prompt:

要注册SourceProcessorSink应用程序先前创建的,进入Spring Cloud Data Flow Shell并在提示下发出以下命令。

app register --name time-source --type source 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT

app register --name time-processor --type processor 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT

app register --name logging-sink --type sink 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT

10. Create and Deploy the Stream

10.创建和部署流

To create a new stream definition go to the Spring Cloud Data Flow Shell and execute the following shell command:

要创建一个新的流定义,请进入Spring Cloud Data Flow Shell并执行以下shell命令。

stream create --name time-to-log 
  --definition 'time-source | time-processor | logging-sink'

This defines a stream named time-to-log based on the DSL expression ‘time-source | time-processor | logging-sink’.

这定义了一个名为time-to-log的流,基于DSL表达式‘time-source | time-processor | logging-sink’

Then to deploy the stream execute the following shell command:

然后,为了部署流,执行以下shell命令。

stream deploy --name time-to-log

The Data Flow Server resolves time-source, time-processor, and logging-sink to maven coordinates and uses those to launch the time-source, time-processor and logging-sink applications of the stream.

数据流服务器时间源时间处理器日志汇解析为maven坐标,并使用这些坐标来启动流的时间源时间处理器日志汇应用程序。

If the stream is correctly deployed you’ll see in the Data Flow Server logs that the modules have been started and tied together:

如果流正确的部署,你会在数据流服务器日志中看到,模块已经被启动并绑定在一起。

2016-08-24 12:29:10.516  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink
2016-08-24 12:29:17.600  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-processor instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor
2016-08-24 12:29:23.280  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-source instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. Reviewing the Result

11.审查结果

In this example, the source simply sends the current timestamp as a message each second, the processor format it and the log sink outputs the formatted timestamp using the logging framework.

在这个例子中,源头只是每秒将当前的时间戳作为消息发送,处理器将其格式化,而日志汇则使用日志框架输出格式化的时间戳。

The log files are located within the directory displayed in the Data Flow Server’s log output, as shown above. To see the result, we can tail the log:

日志文件位于Data Flow Server的日志输出中显示的目录内,如上图所示。为了看到结果,我们可以对日志进行跟踪。

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log
2016-08-24 12:40:42.029  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. Conclusion

12.结论

In this article, we have seen how to build a data pipeline for stream processing through the use of Spring Cloud Data Flow.

在这篇文章中,我们已经看到了如何通过使用Spring Cloud Data Flow来构建一个用于流处理的数据管道。

Also, we saw the role of Source, Processor and Sink applications inside the stream and how to plug and tie this module inside a Data Flow Server through the use of Data Flow Shell.

此外,我们还看到了SourceProcessorSink应用程序在流中的作用,以及如何通过使用Data Flow ShellData Flow Server中插入并绑定该模块。

The example code can be found in the GitHub project.

示例代码可以在GitHub项目中找到。