1. Introduction
1.绪论
Spring Cloud Data Flow is a toolkit for building data integration and real-time data processing pipelines.
Spring Cloud Data Flow是一个用于构建数据集成和实时数据处理管道的工具包。
Pipelines, in this case, are Spring Boot applications that are built with the use of Spring Cloud Stream or Spring Cloud Task frameworks.
在这种情况下,管道是指使用Spring Cloud Stream或Spring Cloud Task框架来构建的Spring Boot应用程序。
In this tutorial, we’ll show how to use Spring Cloud Data Flow with Apache Spark.
在本教程中,我们将展示如何使用Spring Cloud Data Flow与Apache Spark。
2. Data Flow Local Server
2.数据流 本地服务器
First, we need to run the Data Flow Server to be able to deploy our jobs.
首先,我们需要运行数据流服务器以便能够部署我们的作业。
To run the Data Flow Server locally, we need to create a new project with the spring-cloud-starter-dataflow-server-local dependency:
要在本地运行数据流服务器,我们需要创建一个带有spring-cloud-starter-dataflow-server-local依赖的新项目。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
<version>1.7.4.RELEASE</version>
</dependency>
After that, we need to annotate the main class in the server with @EnableDataFlowServer:
之后,我们需要用@EnableDataFlowServer来注释服务器中的主类。
@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowServerApplication.class, args);
}
}
Once we run this application, we’ll have a local Data Flow server on port 9393.
一旦我们运行这个应用程序,我们将在端口9393上有一个本地的Data Flow服务器。
3. Creating a Project
3.创建一个项目
We’ll create a Spark Job as a standalone local application so that we won’t need any cluster to run it.
我们将创建一个Spark作业作为一个独立的本地应用程序,这样我们就不需要任何集群来运行它。
3.1. Dependencies
3.1. 依赖性
First, we’ll add the Spark dependency:
首先,我们将添加Spark的依赖关系。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.4.0</version>
</dependency>
3.2. Creating a Job
3.2.创建一个作业
And for our job, let’s approximate pi:
而对于我们的工作,让我们对π进行近似计算。
public class PiApproximation {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation");
JavaSparkContext context = new JavaSparkContext(conf);
int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2;
int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices;
List<Integer> xs = IntStream.rangeClosed(0, n)
.mapToObj(element -> Integer.valueOf(element))
.collect(Collectors.toList());
JavaRDD<Integer> dataSet = context.parallelize(xs, slices);
JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer -> {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y ) < 1 ? 1: 0;
});
int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2);
System.out.println("The pi was estimated as:" + count / n);
context.stop();
}
}
4. Data Flow Shell
4.数据流外壳
Data Flow Shell is an application that’ll enable us to interact with the server. Shell uses the DSL commands to describe data flows.
数据流Shell是一个应用程序,将使我们能够与服务器互动。Shell 使用 DSL 命令来描述数据流。
To use the Data Flow Shell we need to create a project that’ll allow us to run it. First, we need the spring-cloud-dataflow-shell dependency:
为了使用数据流外壳,我们需要创建一个项目,使我们能够运行它。首先,我们需要spring-cloud-dataflow-shell依赖项。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-shell</artifactId>
<version>1.7.4.RELEASE</version>
</dependency>
After adding the dependency, we can create the class that’ll run our Data Flow shell:
在添加了依赖关系后,我们可以创建运行我们的Data Flow shell的类。
@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
public static void main(String[] args) {
SpringApplication.run(SpringDataFlowShellApplication.class, args);
}
}
5. Deploying the Project
5.部署项目
To deploy our project, we’ll use the so-called task runner that is available for Apache Spark in three versions: cluster, yarn, and client. We’re going to proceed with the local client version.
为了部署我们的项目,我们将使用所谓的任务运行器,它有三个版本,可用于Apache Spark。cluster, yarn, 和client。我们将用本地的client版本进行。
The task runner is what runs our Spark job.
任务运行器是运行我们的Spark作业。
To do that, we first need to register our task using Data Flow Shell:
要做到这一点,我们首先需要使用Data Flow Shell注册我们的任务。
app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT
The task allows us to specify multiple different parameters some of them are optional, but some of the parameters are necessary to deploy the Spark job properly:
该任务允许我们指定多个不同的参数,其中一些是可选的,但有些参数是正确部署Spark作业所必需的。
- spark.app-class, the main class of our submitted job
- spark.app-jar, a path to the fat jar containing our job
- spark.app-name, the name that’ll be used for our job
- spark.app-args, the arguments that’ll be passed to the job
We can use the registered task spark-client to submit our job, remembering to provide the required parameters:
我们可以使用注册的任务spark-client来提交我们的工作,记得要提供所需的参数。
task create spark1 --definition "spark-client \
--spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \
--spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"
Note that spark.app-jar is the path to the fat-jar with our job.
注意,spark.app-jar是指向带有我们工作的fat-jar的路径。
After successful creation of the task, we can proceed to run it with the following command:
成功创建任务后,我们可以用以下命令来运行它。
task launch spark1
This will invoke the execution of our task.
这将调用我们任务的执行。
6. Summary
6.归纳总结
In this tutorial, we have shown how to use the Spring Cloud Data Flow framework to process data with Apache Spark. More information on the Spring Cloud Data Flow framework can be found in the documentation.
在本教程中,我们展示了如何使用Spring Cloud Data Flow框架来使用Apache Spark处理数据。关于Spring Cloud Data Flow框架的更多信息可以在文档中找到。
All code samples can be found on GitHub.
所有的代码样本都可以在GitHub上找到。。