Getting Started with Stream Processing with Spring Cloud Data Flow – 使用Spring Cloud Data Flow进行流处理的入门教程

最后修改: 2016年 9月 1日


1. Introduction


Spring Cloud Data Flow is a cloud-native programming and operating model for composable data microservices.

Spring Cloud Data Flow是一个用于可组合数据微服务的云原生编程和操作模型。

With Spring Cloud Data Flow, developers can create and orchestrate data pipelines for common use cases such as data ingest, real-time analytics, and data import/export.

通过Spring Cloud Data Flow,开发人员可以为数据摄取、实时分析和数据导入/导出等常见用例创建和协调数据管道。

This data pipelines come in two flavors, streaming and batch data pipelines.


In the first case, an unbounded amount of data is consumed or produced via messaging middleware. While in the second case the short-lived task processes a finite set of data and then terminate.


This article will focus on streaming processing.


2. Architectural Overview


The key components these type of architecture are Applications, the Data Flow Server, and the target runtime.


Also in addition to these key components, we also usually have a Data Flow Shell and a message broker within the architecture.


Let’s see all these components in more detail.


2.1. Applications


Typically, a streaming data pipeline includes consuming events from external systems, data processing, and polyglot persistence. These phases are commonly referred to as Source, Processor, and Sink in Spring Cloud terminology:

通常情况下,流式数据管道包括从外部系统消费事件、数据处理和多角化持久化。在Spring Cloud术语中,这些阶段通常被称为SourceProcessorSink

  • Source: is the application that consumes events
  • Processor: consumes data from the Source, does some processing on it, and emits the processed data to the next application in the pipeline
  • Sink: either consumes from a Source or Processor and writes the data to the desired persistence layer

These applications can be packaged in two ways:


  • Spring Boot uber-jar that is hosted in a maven repository, file, http or any other Spring resource implementation (this method will be used in this article)
  • Docker

Many sources, processor, and sink applications for common use-cases (e.g. jdbc, hdfs, http, router) are already provided and ready to use by the Spring Cloud Data Flow team.

Spring Cloud Data Flow团队已经为常见的使用案例(如jdbc、hdfs、http、router)提供了许多源、处理器和汇的应用程序,并可随时使用。

2.2. Runtime


Also, a runtime is needed for these applications to execute. The supported runtimes are:


  • Cloud Foundry
  • Apache YARN
  • Kubernetes
  • Apache Mesos
  • Local Server for development (wich will be used in this article)

2.3. Data Flow Server


The component that is responsible for deploying applications to a runtime is the Data Flow Server. There is a Data Flow Server executable jar provided for each of the target runtimes.

负责将应用程序部署到运行时的组件是Data Flow Server。每个目标运行时都有一个Data Flow Server可执行jar。

The Data Flow Server is responsible for interpreting:


  • A stream DSL that describes the logical flow of data through multiple applications.
  • A deployment manifest that describes the mapping of applications onto the runtime.

2.4. Data Flow Shell


The Data Flow Shell is a client for the Data Flow Server. The shell allows us to perform the DSL command needed to interact with the server.

Data Flow Shell是Data Flow服务器的客户端。 Shell允许我们执行与服务器交互所需的DSL命令。

As an example, the DSL to describe the flow of data from an http source to a jdbc sink would be written as “http | jdbc”. These names in the DSL are registered with the Data Flow Server and map onto application artifacts that can be hosted in Maven or Docker repositories.

例如,描述从http源到jdbc汇的数据流的DSL将被写成 “http | jdbc”。DSL中的这些名称在数据流服务器上注册,并映射到可以托管在Maven或Docker仓库的应用工件上。

Spring also offer a graphical interface, named Flo, for creating and monitoring streaming data pipelines. However, its use is outside the discussion of this article.


2.5. Message Broker


As we’ve seen in the example of the previous section, we have used the pipe symbol into the definition of the flow of data. The pipe symbol represents the communication between the two applications via messaging middleware.

正如我们所看到的 在上一节的例子中,我们已经将管道符号用于数据流的定义。管道符号代表了两个应用程序之间通过消息传递中间件进行的通信。

This means that we need a message broker up and running in the target environment.


The two messaging middleware brokers that are supported are:


  • Apache Kafka
  • RabbitMQ

And so, now that we have an overview of the architectural components – it’s time to build our first stream processing pipeline.


3. Install a Message Broker


As we have seen, the applications in the pipeline need a messaging middleware to communicate. For the purpose of this article, we’ll go with RabbitMQ.


For the full details of the installation, you can follow the instruction on the official site.


4. The Local Data Flow Server


To speed up the process of generating our applications, we’ll use Spring Initializr; with its help, we can obtain our Spring Boot applications in a few minutes.

为了加快生成应用程序的过程,我们将使用Spring Initializr;在其帮助下,我们可以在几分钟内获得我们的Spring Boot应用程序。

After reaching the website, simply choose a Group and an Artifact name.


Once this is done, click on the button Generate Project to start the download of the Maven artifact.

完成后,点击Generate Project按钮,开始下载Maven构件。


After the download is completed, unzip the project and import it as a Maven project in your IDE of choice.


Let’s add a Maven dependency to the project. As we’ll need Dataflow Local Server libraries, let’s add the spring-cloud-starter-dataflow-server-local dependency:



Now we need to annotate the Spring Boot main class with @EnableDataFlowServer annotation:

现在我们需要用@EnableDataFlowServer注解来注释Spring Boot主类。

public class SpringDataFlowServerApplication {

    public static void main(String[] args) {
          SpringDataFlowServerApplication.class, args);

That’s all. Our Local Data Flow Server is ready to be executed:


mvn spring-boot:run

The application will boot up on port 9393.


5. The Data Flow Shell


Again, go to the Spring Initializr and choose a Group and Artifact name.

再次,进入Spring Initializr,选择一个GroupArtifact名称。

Once we’ve downloaded and imported the project, let’s add a spring-cloud-dataflow-shell dependency:



Now we need to add the @EnableDataFlowShell annotation to the Spring Boot main class:

现在我们需要将@EnableDataFlowShell注解添加到Spring Boot主类。

public class SpringDataFlowShellApplication {
    public static void main(String[] args) {, args);

We can now run the shell:


mvn spring-boot:run

After the shell is running, we can type the help command in the prompt to see a complete list of command that we can perform.


6. The Source Application


Similarly, on Initializr, we’ll now create a simple application and add a Stream Rabbit dependency called spring-cloud-starter-stream-rabbit:

同样,在Initializr上,我们现在将创建一个简单的应用程序,并添加一个Stream Rabbit依赖,称为spring-cloud-starter-stream-rabbit:


We’ll then add the @EnableBinding(Source.class) annotation to the Spring Boot main class:

然后我们将把@EnableBinding(Source.class)注解添加到Spring Boot主类。

public class SpringDataFlowTimeSourceApplication {
    public static void main(String[] args) {
          SpringDataFlowTimeSourceApplication.class, args);

Now we need to define the source of the data that must be processed. This source could be any potentially endless workload (internet-of-things sensor data, 24/7 event processing, online transaction data ingest).


In our sample application, we produce one event (for simplicity a new timestamp) every 10 seconds with a Poller.


The @InboundChannelAdapter annotation sends a message to the source’s output channel, using the return value as the payload of the message:


  value = Source.OUTPUT, 
  poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
public MessageSource<Long> timeMessageSource() {
    return () -> MessageBuilder.withPayload(new Date().getTime()).build();

Our data source is ready.


7. The Processor Application


Next- we’ll create an application and add a Stream Rabbit dependency.

接下来,我们将创建一个应用程序并添加一个Stream Rabbit依赖。

We’ll then add the @EnableBinding(Processor.class) annotation to the Spring Boot main class:

然后我们将把@EnableBinding(Processor.class)注解添加到Spring Boot主类。

public class SpringDataFlowTimeProcessorApplication {

    public static void main(String[] args) {
          SpringDataFlowTimeProcessorApplication.class, args);

Next, we need to define a method to process the data that coming from the source application.


To define a transformer, we need to annotate this method with @Transformer annotation:


@Transformer(inputChannel = Processor.INPUT, 
  outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {

    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
    String date = dateFormat.format(timestamp);
    return date;

It converts a timestamp from the ‘input’ channel to a formatted date which will be sent to the ‘output’ channel.

它将 “输入 “通道的时间戳转换为格式化的日期,将被发送到 “输出 “通道。

8. The Sink Application


The last application to create is the Sink application.


Again, go to the Spring Initializr and choose a Group, an Artifact name. After downloading the project let’s add a Stream Rabbit dependency.

再次,进入Spring Initializr并选择一个Group,一个Artifact名称。下载项目后,让我们添加一个Stream Rabbit依赖。

Then add the @EnableBinding(Sink.class) annotation to the Spring Boot main class:

然后将 @EnableBinding(Sink.class)注解添加到Spring Boot主类。

public class SpringDataFlowLoggingSinkApplication {

    public static void main(String[] args) {
          SpringDataFlowLoggingSinkApplication.class, args);

Now we need a method to intercept the messages coming from the processor application.


To do this, we need to add the @StreamListener(Sink.INPUT) annotation to our method:


public void loggerSink(String date) {"Received: " + date);

The method simply prints the timestamp transformed in a formatted date to a log file.


9. Register a Stream App


The Spring Cloud Data Flow Shell allow us to Register a Stream App with the App Registry using the app register command.

Spring Cloud Data Flow Shell允许我们使用app register命令在App Registry注册一个流应用。

We must provide a unique name, application type, and a URI that can be resolved to the app artifact. For the type, specify “source“, “processor“, or “sink“.


When providing a URI with the maven scheme, the format should conform to the following:



To register the Source, Processor and Sink applications previously created, go to the Spring Cloud Data Flow Shell and issue the following commands from the prompt:

要注册SourceProcessorSink应用程序先前创建的,进入Spring Cloud Data Flow Shell并在提示下发出以下命令。

app register --name time-source --type source 
  --uri maven://

app register --name time-processor --type processor 
  --uri maven://

app register --name logging-sink --type sink 
  --uri maven://

10. Create and Deploy the Stream


To create a new stream definition go to the Spring Cloud Data Flow Shell and execute the following shell command:

要创建一个新的流定义,请进入Spring Cloud Data Flow Shell并执行以下shell命令。

stream create --name time-to-log 
  --definition 'time-source | time-processor | logging-sink'

This defines a stream named time-to-log based on the DSL expression ‘time-source | time-processor | logging-sink’.

这定义了一个名为time-to-log的流,基于DSL表达式‘time-source | time-processor | logging-sink’

Then to deploy the stream execute the following shell command:


stream deploy --name time-to-log

The Data Flow Server resolves time-source, time-processor, and logging-sink to maven coordinates and uses those to launch the time-source, time-processor and logging-sink applications of the stream.


If the stream is correctly deployed you’ll see in the Data Flow Server logs that the modules have been started and tied together:


2016-08-24 12:29:10.516  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink
2016-08-24 12:29:17.600  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-processor instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor
2016-08-24 12:29:23.280  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-source instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. Reviewing the Result


In this example, the source simply sends the current timestamp as a message each second, the processor format it and the log sink outputs the formatted timestamp using the logging framework.


The log files are located within the directory displayed in the Data Flow Server’s log output, as shown above. To see the result, we can tail the log:

日志文件位于Data Flow Server的日志输出中显示的目录内,如上图所示。为了看到结果,我们可以对日志进行跟踪。

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log
2016-08-24 12:40:42.029  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. Conclusion


In this article, we have seen how to build a data pipeline for stream processing through the use of Spring Cloud Data Flow.

在这篇文章中,我们已经看到了如何通过使用Spring Cloud Data Flow来构建一个用于流处理的数据管道。

Also, we saw the role of Source, Processor and Sink applications inside the stream and how to plug and tie this module inside a Data Flow Server through the use of Data Flow Shell.

此外,我们还看到了SourceProcessorSink应用程序在流中的作用,以及如何通过使用Data Flow ShellData Flow Server中插入并绑定该模块。

The example code can be found in the GitHub project.
