Spring Cloud Data Flow With Apache Spark – 用Apache Spark的Spring Cloud数据流

最后修改: 2019年 4月 10日


1. Introduction


Spring Cloud Data Flow is a toolkit for building data integration and real-time data processing pipelines.  

Spring Cloud Data Flow是一个用于构建数据集成和实时数据处理管道的工具包。

Pipelines, in this case, are Spring Boot applications that are built with the use of Spring Cloud Stream or Spring Cloud Task frameworks.

在这种情况下,管道是指使用Spring Cloud StreamSpring Cloud Task框架来构建的Spring Boot应用程序。

In this tutorial, we’ll show how to use Spring Cloud Data Flow with Apache Spark.

在本教程中,我们将展示如何使用Spring Cloud Data Flow与Apache Spark

2. Data Flow Local Server

2.数据流 本地服务器

First, we need to run the Data Flow Server to be able to deploy our jobs.


To run the Data Flow Server locally, we need to create a new project with the spring-cloud-starter-dataflow-server-local dependency:



After that, we need to annotate the main class in the server with @EnableDataFlowServer:


public class SpringDataFlowServerApplication {
    public static void main(String[] args) {
          SpringDataFlowServerApplication.class, args);

Once we run this application, we’ll have a local Data Flow server on port 9393.

一旦我们运行这个应用程序,我们将在端口9393上有一个本地的Data Flow服务器。

3. Creating a Project


We’ll create a Spark Job as a standalone local application so that we won’t need any cluster to run it.


3.1. Dependencies

3.1. 依赖性

First, we’ll add the Spark dependency:



3.2. Creating a Job


And for our job, let’s approximate pi:


public class PiApproximation {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation");
        JavaSparkContext context = new JavaSparkContext(conf);
        int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2;
        int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices;

        List<Integer> xs = IntStream.rangeClosed(0, n)
          .mapToObj(element -> Integer.valueOf(element))

        JavaRDD<Integer> dataSet = context.parallelize(xs, slices);

        JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer -> {
           double x = Math.random() * 2 - 1;
           double y = Math.random() * 2 - 1;
           return (x * x + y * y ) < 1 ? 1: 0;

        int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2);

        System.out.println("The pi was estimated as:" + count / n);


4. Data Flow Shell


Data Flow Shell is an application that’ll enable us to interact with the server. Shell uses the DSL commands to describe data flows.

数据流Shell是一个应用程序,将使我们能够与服务器互动。Shell 使用 DSL 命令来描述数据流。

To use the Data Flow Shell we need to create a project that’ll allow us to run it. First, we need the spring-cloud-dataflow-shell dependency:



After adding the dependency, we can create the class that’ll run our Data Flow shell:

在添加了依赖关系后,我们可以创建运行我们的Data Flow shell的类。

public class SpringDataFlowShellApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);

5. Deploying the Project


To deploy our project, we’ll use the so-called task runner that is available for Apache Spark in three versions: cluster, yarn, and client. We’re going to proceed with the local client version.

为了部署我们的项目,我们将使用所谓的任务运行器,它有三个版本,可用于Apache Spark。cluster, yarn, 和client。我们将用本地的client版本进行。

The task runner is what runs our Spark job.


To do that, we first need to register our task using Data Flow Shell:

要做到这一点,我们首先需要使用Data Flow Shell注册我们的任务。

app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT

The task allows us to specify multiple different parameters some of them are optional, but some of the parameters are necessary to deploy the Spark job properly:


  • spark.app-class, the main class of our submitted job
  • spark.app-jar, a path to the fat jar containing our job
  • spark.app-name, the name that’ll be used for our job
  • spark.app-args, the arguments that’ll be passed to the job

We can use the registered task spark-client to submit our job, remembering to provide the required parameters:


task create spark1 --definition "spark-client \
  --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \
  --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"

Note that spark.app-jar is the path to the fat-jar with our job.


After successful creation of the task, we can proceed to run it with the following command:


task launch spark1

This will invoke the execution of our task.


6. Summary


In this tutorial, we have shown how to use the Spring Cloud Data Flow framework to process data with Apache Spark. More information on the Spring Cloud Data Flow framework can be found in the documentation.

在本教程中,我们展示了如何使用Spring Cloud Data Flow框架来使用Apache Spark处理数据。关于Spring Cloud Data Flow框架的更多信息可以在文档中找到。

All code samples can be found on GitHub.
