1. Overview
1.概述
Spring Cloud Data Flow is a cloud-native toolkit for building real-time data pipelines and batch processes. Spring Cloud Data Flow is ready to be used for a range of data processing use cases like simple import/export, ETL processing, event streaming, and predictive analytics.
Spring Cloud Data Flow是一个用于构建实时数据管道和批处理过程的云原生工具箱。Spring Cloud Data Flow可随时用于一系列数据处理用例,如简单的导入/导出、ETL处理、事件流和预测分析。
In this tutorial, we’ll learn an example of real-time Extract Transform and Load (ETL) using a stream pipeline that extracts data from a JDBC database, transforms it to simple POJOs and loads it into a MongoDB.
在本教程中,我们将学习一个使用流管道的实时提取、转换和加载(ETL)的例子,该管道从JDBC数据库中提取数据,将其转换为简单的POJOs并加载到MongoDB。
2. ETL and Event-Stream Processing
2.ETL和事件流处理
ETL – extract, transform and load – was commonly referred to as a process that batch-loads data from several databases and systems into a common data warehouse. In this data warehouse, it’s possible to do heavy data analysis processing without compromising the overall performance of the system.
ETL–提取、转换和加载–通常指的是将数据从几个数据库和系统中批量加载到一个共同的数据仓库的过程。在这个数据仓库中,可以在不影响系统整体性能的情况下进行大量的数据分析处理。
However, new trends are changing the way how this is done. ETL still has a role in transferring data to data warehouses and data lakes.
然而,新的趋势正在改变这种方式。ETL在将数据传输到数据仓库和数据湖方面仍有一定的作用。
Nowadays this can be done with streams in an event-stream architecture with the help of Spring Cloud Data Flow.
如今,在Spring Cloud Data Flow的帮助下,可以用事件流架构中的streams来完成。
3. Spring Cloud Data Flow
3.Spring云数据流
With Spring Cloud Data Flow (SCDF), developers can create data pipelines in two flavors:
通过Spring Cloud Data Flow(SCDF),开发人员可以创建两种风格的数据管道。
- Long-lived real-time stream applications using Spring Cloud Stream
- Short-lived batched task applications using Spring Cloud Task
In this article, we’ll cover the first, a long-lived streaming application based on Spring Cloud Stream.
在这篇文章中,我们将介绍第一个,一个基于Spring Cloud Stream的长寿流应用。
3.1. Spring Cloud Stream Applications
3.1.Spring云流应用
The SCDF Stream pipelines are composed of steps, where each step is an application built in Spring Boot style using the Spring Cloud Stream micro-framework. These applications are integrated by a messaging middleware like Apache Kafka or RabbitMQ.
SCDF流管道由多个步骤组成,其中每个步骤都是使用Spring Cloud Stream微框架以Spring Boot风格构建的应用程序。这些应用程序由Apache Kafka或RabbitMQ等消息中间件集成。
These applications are classified into sources, processors, and sinks. Comparing to the ETL process, we could say that the source is the “extract”, the processor is the “transformer” and the sink is the “load” part.
这些应用被划分为源、处理器和汇。与ETL过程相比,我们可以说源是 “提取”,处理器是 “转化器”,汇是 “加载 “部分。
In some cases, we can use an application starter in one or more steps of the pipeline. This means that we wouldn’t need to implement a new application for a step, but instead, configure an existing application starter already implemented.
在某些情况下,我们可以在管道的一个或多个步骤中使用一个应用程序启动器。这意味着我们不需要为一个步骤实现一个新的应用程序,而是配置一个已经实现的现有应用程序启动器。
A list of application starters could be found here.
可以在这里找到申请启动器的清单。。
3.2. Spring Cloud Data Flow Server
3.2.Spring Cloud数据流服务器
The last piece of the architecture is the Spring Cloud Data Flow Server. The SCDF Server does the deployment of the applications and the pipeline stream using the Spring Cloud Deployer Specification. This specification supports the SCDF cloud-native flavor by deploying to a range of modern runtimes, such as Kubernetes, Apache Mesos, Yarn, and Cloud Foundry.
架构的最后一块是Spring Cloud Data Flow Server。SCDF服务器使用Spring Cloud Deployer规范进行应用程序和管道流的部署。该规范通过部署到一系列现代运行时(如Kubernetes、Apache Mesos、Yarn和Cloud Foundry)来支持SCDF的云原生风味。
Also, we can run the stream as a local deployment.
另外,我们可以把流作为本地部署来运行。
More information about the SCDF architecture can be found here.
关于SCDF架构的更多信息可以在这里找到。
4. Environment Setup
4.环境设置
Before we start, we need to choose the pieces of this complex deployment. The first piece to define is the SCDF Server.
在我们开始之前,我们需要选择这个复杂部署的部分。第一个要定义的部分是SCDF服务器。
For testing, we’ll use SCDF Server Local for local development. For the production deployment, we can later choose a cloud-native runtime, like SCDF Server Kubernetes. We can find the list of server runtimes here.
对于测试,我们将使用SCDF Server Local进行本地开发。对于生产部署,我们以后可以选择一个云原生的运行时,例如SCDF Server Kubernetes。我们可以在这里找到服务器运行时的列表。
Now, let’s check the system requirements to run this server.
现在,让我们检查一下运行这个服务器的系统要求。
4.1. System Requirements
4.1.系统要求
To run the SCDF Server, we’ll have to define and set up two dependencies:
为了运行SCDF服务器,我们必须定义和设置两个依赖项。
- the messaging middleware, and
- the RDBMS.
For the messaging middleware, we’ll work with RabbitMQ, and we choose PostgreSQL as an RDBMS for storing our pipeline stream definitions.
对于消息传递中间件,我们将使用RabbitMQ,我们选择PostgreSQL作为RDBMS来存储我们的管道流定义。
For running RabbitMQ, download the latest version here and start a RabbitMQ instance using the default configuration or run the following Docker command:
对于运行 RabbitMQ,请下载最新版本这里并使用默认配置启动 RabbitMQ 实例或运行以下 Docker 命令。
docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
As the last setup step, install and run the PostgreSQL RDBMS on the default port 5432. After this, create a database where SCDF can store its stream definitions using the following script:
作为最后的设置步骤,在默认的5432端口安装并运行PostgreSQL RDBMS。在这之后,使用下面的脚本创建一个数据库,SCDF可以在其中存储其流定义。
CREATE DATABASE dataflow;
4.2. Spring Cloud Data Flow Server Local
4.2.Spring Cloud数据流服务器本地
For running the SCDF Server Local, we can choose to start the server using docker-compose, or we can start it as a Java application.
对于运行SCDF服务器本地,我们可以选择启动服务器使用docker-compose,或者我们可以把它作为一个Java应用程序来启动。
Here, we’ll run the SCDF Server Local as a Java application. For configuring the application, we have to define the configuration as Java application parameters. We’ll need Java 8 in the System path.
在这里,我们将把SCDF Server Local作为一个Java应用程序来运行。为了配置该应用程序,我们必须把配置定义为Java应用程序参数。我们在系统路径中需要Java 8。
To host the jars and dependencies, we need to create a home folder for our SCDF Server and download the SCDF Server Local distribution into this folder. You can download the most recent distribution of SCDF Server Local here.
为了承载jars和依赖项,我们需要为我们的SCDF服务器创建一个主文件夹,并将SCDF服务器本地版下载到这个文件夹中。你可以下载最新的SCDF Server Local发行版这里。
Also, we need to create a lib folder and put a JDBC driver there. The latest version of the PostgreSQL driver is available here.
另外,我们需要创建一个lib文件夹,并在那里放置一个JDBC驱动程序。最新版本的PostgreSQL驱动可以在这里获得。
Finally, let’s run the SCDF local server:
最后,让我们运行SCDF本地服务器。
$java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \
--spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \
--spring.datasource.username=postgres_username \
--spring.datasource.password=postgres_password \
--spring.datasource.driver-class-name=org.postgresql.Driver \
--spring.rabbitmq.host=127.0.0.1 \
--spring.rabbitmq.port=5672 \
--spring.rabbitmq.username=guest \
--spring.rabbitmq.password=guest
We can check if it’s running by looking at this URL:
我们可以通过查看这个URL来检查它是否正在运行。
http://localhost:9393/dashboard
http://localhost:9393/dashboard
4.3. Spring Cloud Data Flow Shell
4.3.Spring Cloud数据流外壳
The SCDF Shell is a command line tool that makes it easy to compose and deploy our applications and pipelines. These Shell commands run over the Spring Cloud Data Flow Server REST API.
SCDF Shell是一个命令行工具,使我们能够轻松地组成和部署我们的应用程序和管道。这些 Shell 命令通过 Spring Cloud Data Flow Server REST API 运行。
Download the latest version of the jar into your SCDF home folder, available here. Once it is done, run the following command (update the version as needed):
将最新版本的jar下载到你的SCDF主文件夹中,可用这里。一旦完成,运行以下命令(根据需要更新版本)。
$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar
____ ____ _ __
/ ___| _ __ _ __(_)_ __ __ _ / ___| | ___ _ _ __| |
\___ \| '_ \| '__| | '_ \ / _` | | | | |/ _ \| | | |/ _` |
___) | |_) | | | | | | | (_| | | |___| | (_) | |_| | (_| |
|____/| .__/|_| |_|_| |_|\__, | \____|_|\___/ \__,_|\__,_|
____ |_| _ __|___/ __________
| _ \ __ _| |_ __ _ | ___| | _____ __ \ \ \ \ \ \
| | | |/ _` | __/ _` | | |_ | |/ _ \ \ /\ / / \ \ \ \ \ \
| |_| | (_| | || (_| | | _| | | (_) \ V V / / / / / / /
|____/ \__,_|\__\__,_| |_| |_|\___/ \_/\_/ /_/_/_/_/_/
Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".
dataflow:>
If instead of “dataflow:>” you get “server-unknown:>” in the last line, you are not running the SCDF Server at localhost. In this case, run the following command to connect to another host:
如果在最后一行得到的不是”dataflow:>”,而是”server-unknown:>”,说明你没有在本地主机上运行SCDF服务器。在这种情况下,请运行下面的命令,连接到另一个主机。
server-unknown:>dataflow config server http://{host}
Now, Shell is connected to the SCDF Server, and we can run our commands.
现在,Shell已经连接到SCDF服务器,我们可以运行我们的命令。
The first thing we need to do in Shell is to import the application starters. Find the latest version here for RabbitMQ+Maven in Spring Boot 2.0.x, and run the following command (again, update the version, here “Darwin-SR1“, as needed):
我们在 Shell 中需要做的第一件事是导入应用程序启动器。在 Spring Boot 2.0.x 中找到 RabbitMQ+Maven 的最新版本此处,然后运行以下命令(再次根据需要更新版本,此处为”Darwin-SR1“)。
$ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven
For checking the installed applications run the following Shell command:
为了检查已安装的应用程序,运行以下Shell命令。
$ dataflow:> app list
As a result, we should see a table containing all the installed applications.
结果是,我们应该看到一个包含所有已安装应用程序的表格。
Also, SCDF offers a graphical interface, named Flo, that we can access by this address: http://localhost:9393/dashboard. However, its use isn’t in the scope of this article.
另外,SCDF提供了一个图形界面,名为Flo,我们可以通过这个地址访问。http://localhost:9393/dashboard。 然而,它的使用并不在本文的范围之内。
5. Composing an ETL Pipeline
5.组成一个ETL管道
Let’s now create our stream pipeline. For doing this, we’ll use the JDBC Source application starter to extract information from our relational database.
现在让我们来创建我们的流管道。为了做到这一点,我们将使用JDBC源程序启动器,从我们的关系数据库中提取信息。
Also, we’ll create a custom processor for transforming the information structure and a custom sink to load our data into a MongoDB.
此外,我们将创建一个自定义处理器来转换信息结构,并创建一个自定义水槽来将我们的数据加载到MongoDB。
5.1. Extract – Preparing a Relational Database for Extraction
5.1.提取–为提取准备一个关系型数据库
Let’s create a database with the name of crm and a table with the name of customer:
让我们创建一个名称为crm的数据库和一个名称为customer的表。
CREATE DATABASE crm;
CREATE TABLE customer (
id bigint NOT NULL,
imported boolean DEFAULT false,
customer_name character varying(50),
PRIMARY KEY(id)
)
Note that we’re using a flag imported, which will store which record has already been imported. We could also store this information in another table, if necessary.
注意,我们使用的是一个标志imported,它将存储哪条记录已经被导入。如果有必要,我们也可以在另一个表中存储这一信息。
Now, let’s insert some data:
现在,让我们插入一些数据。
INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);
5.2. Transform – Mapping JDBC Fields to the MongoDB Fields Structure
5.2.转换–将JDBC字段映射为MongoDB字段结构
For the transformation step, we’ll do a simple translation of the field customer_name from the source table, to a new field name. Other transformations could be done here, but let’s keep the example short.
对于转换步骤,我们将对源表中的字段customer_name做一个简单的转换,转为一个新的字段name。这里还可以做其他的转换,但让我们保持这个例子的简短。
To do this, we’ll create a new project with the name customer-transform. The easiest way to do this is by using the Spring Initializr site to create the project. After reaching the website, choose a Group and an Artifact name. We’ll use com.customer and customer-transform, respectively.
为此,我们将创建一个名为customer-transform的新项目。最简单的方法是使用Spring Initializr网站来创建该项目。到达网站后,选择一个组和一个工件名称。我们将分别使用com.customer和customer-transform,。
Once this is done, click on the button “Generate Project” to download the project. Then, unzip the project and import it into your favorite IDE, and add the following dependency to the pom.xml:
完成后,点击 “生成项目 “按钮,下载项目。然后,解压该项目并将其导入你最喜欢的IDE,并在pom.xml中添加以下依赖关系。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
Now we’re set to start coding the field name conversion. To do this, we’ll create the Customer class to act as an adapter. This class will receive the customer_name via the setName() method and will output its value via getName method.
现在我们准备开始编写字段名转换的代码。为此,我们将创建Customer类,作为一个适配器。这个类将通过setName()方法接收customer_name,并通过getName方法输出其值。
The @JsonProperty annotations will do the transformation while deserializing from JSON to Java:
@JsonProperty 注释将在从JSON到Java的反序列化过程中进行转换。
public class Customer {
private Long id;
private String name;
@JsonProperty("customer_name")
public void setName(String name) {
this.name = name;
}
@JsonProperty("name")
public String getName() {
return name;
}
// Getters and Setters
}
The processor needs to receive data from an input, do the transformation and bind the outcome to an output channel. Let’s create a class to do this:
处理器需要从输入端接收数据,进行转换并将结果绑定到输出通道。让我们创建一个类来做这件事。
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;
@EnableBinding(Processor.class)
public class CustomerProcessorConfiguration {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Customer convertToPojo(Customer payload) {
return payload;
}
}
In the above code, we can observe that the transformation occurs automatically. The input receives the data as JSON and Jackson deserialize it into a Customer object using the set methods.
在上面的代码中,我们可以观察到转换是自动发生的。输入端接收JSON格式的数据,Jackson使用set方法将其反序列化为Customer对象。
The opposite is for the output, the data is serialized to JSON using the get methods.
相反,对于输出,数据被序列化为JSON,使用get方法。
5.3. Load – Sink in MongoDB
5.3.负载–MongoDB中的水槽
Similarly to the transform step, we’ll create another maven project, now with the name customer-mongodb-sink. Again, access the Spring Initializr, for the Group choose com.customer, and for the Artifact choose customer-mongodb-sink. Then, type “MongoDB“ in the dependencies search box and download the project.
与转换步骤类似,我们将创建另一个maven项目,现在名称为customer-mongodb-sink。再次访问Spring Initializr,在组中选择com.customer,在工件中选择customer-mongodb-sink。然后,在依赖项搜索框中输入“MongoDB“并下载该项目。
Next, unzip and import it to your favorite IDE.
接下来,解压缩并将其导入到你最喜欢的IDE。
Then, add the same extra dependency as in the customer-transform project.
然后,添加与customer-transform项目中相同的额外依赖。
Now we’ll create another Customer class, for receiving input in this step:
现在我们将创建另一个Customer类,用于接收这一步的输入。
import org.springframework.data.mongodb.core.mapping.Document;
@Document(collection="customer")
public class Customer {
private Long id;
private String name;
// Getters and Setters
}
For sinking the Customer, we’ll create a Listener class that will save the customer entity using the CustomerRepository:
对于沉入Customer,我们将创建一个Listener类,它将使用CustomerRepository保存客户实体。
@EnableBinding(Sink.class)
public class CustomerListener {
@Autowired
private CustomerRepository repository;
@StreamListener(Sink.INPUT)
public void save(Customer customer) {
repository.save(customer);
}
}
And the CustomerRepository, in this case, is a MongoRepository from Spring Data:
而本例中的CustomerRepository,是来自Spring Data的MongoRepository。
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface CustomerRepository extends MongoRepository<Customer, Long> {
}
5.4. Stream Definition
5.4.流的定义
Now, both custom applications are ready to be registered on SCDF Server. To accomplish this, compile both projects using the Maven command mvn install.
现在,两个自定义应用程序已经准备好在SCDF服务器上注册了。为了完成这个任务,使用Maven命令mvn install编译两个项目。
We then register them using the Spring Cloud Data Flow Shell:
然后我们使用Spring Cloud Data Flow Shell注册它们。
app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT
app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT
Finally, let’s check if the applications are stored at SCDF, run the application list command in the shell:
最后,让我们检查一下应用程序是否存储在SCDF,在shell中运行应用程序列表命令。
app list
As a result, we should see both applications in the resulting table.
因此,我们应该在结果表中看到这两个应用程序。
5.4.1. Stream Pipeline Domain-Specific Language – DSL
5.4.1.流管道特定领域语言 – DSL
A DSL defines the configuration and data flow between the applications. The SCDF DSL is simple. In the first word, we define the name of the application, followed by the configurations.
DSL定义了应用程序之间的配置和数据流。SCDF DSL很简单。在第一个词中,我们定义了应用程序的名称,然后是配置。
Also, the syntax is a Unix-inspired Pipeline syntax, that uses vertical bars, also known as “pipes”, to connect multiple applications:
另外,该语法是一种受Unix启发的管道语法,它使用垂直条,也称为 “管道”,来连接多个应用程序。
http --port=8181 | log
This creates an HTTP application served in port 8181 which sends any received body payload to a log.
这创建了一个HTTP应用程序,在8181端口服务,将任何收到的主体有效载荷发送到一个日志。
Now, let’s see how to create the DSL stream definition of the JDBC Source.
现在,让我们看看如何创建JDBC源的DSL流定义。
5.4.2. JDBC Source Stream Definition
5.4.2.JDBC源流的定义
The key configurations for the JDBC Source are query and update. query will select unread records while update will change a flag to prevent the current records from being reread.
JDBC源的关键配置是query和update。query将选择未读记录,而update将改变一个标志,以防止当前记录被重新读取。
Also, we’ll define the JDBC Source to poll in a fixed delay of 30 seconds and polling maximum 1000 rows. Finally, we’ll define the configurations of connection, like driver, username, password and connection URL:
另外,我们将定义JDBC源,以30秒的固定延迟进行轮询,最多可轮询1000行。最后,我们将定义连接的配置,如驱动程序、用户名、密码和连接URL。
jdbc
--query='SELECT id, customer_name FROM public.customer WHERE imported = false'
--update='UPDATE public.customer SET imported = true WHERE id in (:id)'
--max-rows-per-poll=1000
--fixed-delay=30 --time-unit=SECONDS
--driver-class-name=org.postgresql.Driver
--url=jdbc:postgresql://localhost:5432/crm
--username=postgres
--password=postgres
More JDBC Source configuration properties can be found here.
更多的JDBC源配置属性可以在这里找到。
5.4.3. Customer MongoDB Sink Stream Definition
5.4.3.客户MongoDB汇流定义
As we didn’t define the connection configurations in application.properties of customer-mongodb-sink, we’ll configure through DSL parameters.
由于我们没有在application.properties的customer-mongodb-sink中定义连接配置,我们将通过DSL参数进行配置。
Our application is fully based on the MongoDataAutoConfiguration. You can check out the other possible configurations here. Basically, we’ll define the spring.data.mongodb.uri:
我们的应用程序完全基于MongoDataAutoConfiguration。你可以查看其他可能的配置这里。基本上,我们将定义spring.data.mongodb.uri。
customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main
5.4.4. Create and Deploy the Stream
5.4.4.创建和部署流
First, to create the final stream definition, go back to the Shell and execute the following command (without line breaks, they have just been inserted for readability):
首先,为了创建最终的流定义,回到Shell,执行以下命令(没有换行,只是为了方便阅读而插入的)。
stream create --name jdbc-to-mongodb
--definition "jdbc
--query='SELECT id, customer_name FROM public.customer WHERE imported=false'
--fixed-delay=30
--max-rows-per-poll=1000
--update='UPDATE customer SET imported=true WHERE id in (:id)'
--time-unit=SECONDS
--password=postgres
--driver-class-name=org.postgresql.Driver
--username=postgres
--url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink
--spring.data.mongodb.uri=mongodb://localhost/main"
This stream DSL defines a stream named jdbc-to-mongodb. Next, we’ll deploy the stream by its name:
这个流DSL定义了一个名为jdbc-tomongodb的流。接下来,我们将通过它的名字来部署这个流。
stream deploy --name jdbc-to-mongodb
Finally, we should see the locations of all available logs in the log output:
最后,我们应该在日志输出中看到所有可用日志的位置。
Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink
Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform
Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc
6. Conclusion
6.结语
In this article, we’ve seen a full example of an ETL data pipeline using Spring Cloud Data Flow.
在这篇文章中,我们已经看到了一个使用Spring Cloud Data Flow的ETL数据管道的完整例子。
Most noteworthy, we saw the configurations of an application starter, created an ETL stream pipeline using the Spring Cloud Data Flow Shell and implemented custom applications for our reading, transforming and writing data.
最值得一提的是,我们看到了一个应用程序启动器的配置,使用Spring Cloud Data Flow Shell创建了一个ETL流管道,并为我们的读取、转换和写入数据实现了自定义应用程序。
As always, the example code can be found in the GitHub project.
一如既往,示例代码可以在GitHub项目中找到。。