1. Overview
1.概述
Apache Spark is an open-source and distributed analytics and processing system that enables data engineering and data science at scale. It simplifies the development of analytics-oriented applications by offering a unified API for data transfer, massive transformations, and distribution.
Apache Spark是一个开源的分布式分析和处理系统,可实现大规模的数据工程和数据科学。它通过为数据传输、大规模转换和分发提供统一的API,简化了面向分析的应用程序的开发。
The DataFrame is an important and essential component of Spark API. In this tutorial, we’ll look into some of the Spark DataFrame APIs using a simple customer data example.
DataFrame是Spark API的一个重要的基本组成部分。在本教程中,我们将使用一个简单的客户数据例子来研究Spark的一些DataFrame API。
2. DataFrame in Spark
2.Spark中的DataFrame</em
Logically, a DataFrame is an immutable set of records organized into named columns. It shares similarities with a table in RDBMS or a ResultSet in Java.
在逻辑上,DataFrame是一个不可改变的记录集,被组织成命名的列。它与RDBMS中的表或Java中的ResultSet有相似之处。
As an API, the DataFrame provides unified access to multiple Spark libraries including Spark SQL, Spark Streaming, MLib, and GraphX.
作为一个 API,DataFrame提供了对多个 Spark 库的统一访问,包括Spark SQL、Spark Streaming、MLib 和 GraphX。
In Java, we use Dataset<Row> to represent a DataFrame.
在Java中,我们使用Dataset<Row>来表示一个DataFrame。
Essentially, a Row uses efficient storage called Tungsten, which highly optimizes Spark operations in comparison with its predecessors.
从本质上讲,Row使用名为Tungsten的高效存储,与前辈相比,它高度优化了Spark操作。
3. Maven Dependencies
3.Maven的依赖性
Let’s start by adding the spark-core and spark-sql dependencies to our pom.xml:
让我们先把spark-core和spark-sql依赖项添加到我们的pom.xml中。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.8</version>
</dependency>
4. DataFrame and Schema
4. 数据框架和模式
Essentially, a DataFrame is an RDD with a schema. The schema can either be inferred or defined as a StructType.
基本上,DataFrame是一个带有模式的RDD。该模式可以是推断出来的,也可以定义为StructType。
StructType is a built-in data type in Spark SQL that we use to represent a collection of StructField objects.
StructType是Spark SQL中的一个内置数据类型,我们用它来表示StructField对象的集合。
Let’s define a sample Customer schema StructType:
让我们定义一个样本Customer模式StructType。
public static StructType minimumCustomerDataSchema() {
return DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("gender", DataTypes.StringType, true),
DataTypes.createStructField("transaction_amount", DataTypes.IntegerType, true) }
);
}
Here, each StructField has a name that represents the DataFrame column name, type, and boolean value that represents whether it’s nullable.
在这里,每个StructField都有一个代表DataFrame列名、类型和代表它是否为空的boolean值的名称。
5. Constructing DataFrames
5.构建DataFrames
The first operation for every Spark application is to get a SparkSession via master.
每个Spark应用程序的第一个操作是通过主程序获得一个SparkSession。
It provides us with an entry point to access the DataFrames. Let’s start by creating the SparkSession:
它为我们提供了一个访问DataFrames的入口点。让我们从创建SparkSession开始。
public static SparkSession getSparkSession() {
return SparkSession.builder()
.appName("Customer Aggregation pipeline")
.master("local")
.getOrCreate();
}
Note here that we’re connecting to Spark using the local master. If we were to connect to the cluster, we would give the cluster address instead.
请注意,我们是用本地主站连接到Spark的。如果我们要连接到集群,我们会给出集群的地址。
Once we have a SparkSession, we can create a DataFrame using various methods. Let’s briefly look at some of them.
一旦我们有了一个SparkSession,我们就可以使用各种方法创建一个DataFrame。让我们简单地看一下其中的一些。
5.1. DataFrame from List<POJO>
5.1.DataFrame来自List<POJO>
Let’s build a List<Customer> first:
让我们先建立一个List<Customer>:。
List<Customer> customers = Arrays.asList(
aCustomerWith("01", "jo", "Female", 2000),
aCustomerWith("02", "jack", "Male", 1200)
);
Next, let’s construct the DataFrame from the List<Customer> using createDataFrame:
接下来,让我们使用 createDataFrame从List<Customer> 构建DataFrame。
Dataset<Row> df = SPARK_SESSION
.createDataFrame(customerList, Customer.class);
5.2. DataFrame from Dataset
5.2.来自数据集的数据框架
If we have a Dataset, we can easily convert it to a DataFrame by calling toDF on the Dataset.
如果我们有一个数据集,我们可以通过对数据集调用toDF,轻松将其转换为数据框架。
Let’s create a Dataset<Customer> first, using createDataset, that takes org.apache.spark.sql.Encoders:
让我们先创建一个Dataset<Customer>,使用createDataset,它需要org.apache.spark.sql.Encoders。
Dataset<Customer> customerPOJODataSet = SPARK_SESSION
.createDataset(CUSTOMERS, Encoders.bean(Customer.class));
Next, let’s convert it to DataFrame:
接下来,让我们把它转换为DataFrame。
Dataset<Row> df = customerPOJODataSet.toDF();
5.3. Row from a POJO Using RowFactory
5.3.使用RowFactory从一个POJO中获取Row
Since DataFrame is essentially a Dataset<Row>, let’s see how we can create a Row from a Customer POJO.
由于DataFrame本质上是一个Dataset<Row>,让我们看看如何从Customer POJO中创建一个Row。
Basically, by implementing MapFunction<Customer, Row> and overriding the call method, we can map each Customer to a Row using RowFactory.create:
基本上,通过实现MapFunction<Customer, Row> 和覆盖 call方法,我们可以使用RowFactory.create将每个Customer映射到A Row。
public class CustomerToRowMapper implements MapFunction<Customer, Row> {
@Override
public Row call(Customer customer) throws Exception {
Row row = RowFactory.create(
customer.getId(),
customer.getName().toUpperCase(),
StringUtils.substring(customer.getGender(),0, 1),
customer.getTransaction_amount()
);
return row;
}
}
We should note that we can manipulate the Customer data here before converting it into a Row.
我们应该注意到,在将Customer数据转换为Row之前,我们可以在这里对其进行操作。
5.4. DataFrame from List<Row>
5.4.DataFrame来自List<Row>
We can also create a DataFrame from a list of Row objects:
我们还可以从一个Row对象的列表中创建一个DataFrame。
List<Row> rows = customer.stream()
.map(c -> new CustomerToRowMapper().call(c))
.collect(Collectors.toList());
Now, let’s give this List<Row> to SparkSession along with the StructType schema:
现在,让我们把这个List<Row>与StructType模式一起交给SparkSession。
Dataset<Row> df = SparkDriver.getSparkSession()
.createDataFrame(rows, SchemaFactory.minimumCustomerDataSchema());
Note here that the List<Row> will be converted to DataFrame based on the schema definition. Any field not present in the schema will not be part of the DataFrame.
请注意,List<Row>将根据模式定义转换为DataFrame。任何不存在于模式中的字段将不会成为DataFrame的一部分。
5.5. DataFrame from Structured Files and Database
5.5.数据框架来自结构化文件和数据库
DataFrames can store columnar information, like a CSV file, and nested fields and arrays, like a JSON file.
DataFrames可以存储列式信息,如CSV文件,以及嵌套字段和数组,如JSON文件。
The DataFrame API remains the same regardless of whether we’re working with CSV files, JSON files, or other formats as well as Databases.
无论我们是使用CSV文件、JSON文件还是其他格式以及数据库,DataFrame API都保持不变。
Let’s create the DataFrame from multiline JSON data:
让我们从多行JSON数据中创建DataFrame。
Dataset<Row> df = SparkDriver.getSparkSession()
.read()
.format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
.option("multiline", true)
.load("data/minCustomerData.json");
Similarly, in the case of reading from the database, we’ll have:
类似地,在从数据库中读取数据的情况下,我们会有。
Dataset<Row> df = SparkDriver.getSparkSession()
.read()
.option("url", "jdbc:postgresql://localhost:5432/customerdb")
.option("dbtable", "customer")
.option("user", "user")
.option("password", "password")
.option("serverTimezone", "EST")
.format("jdbc")
.load();
6. Converting DataFrame to Dataset
6.将DataFrame转换为Dataset
Now, let’s see how we can convert our DataFrame into a Dataset. This conversion is useful if we want to manipulate our existing POJOs and the extended API that apply to only the DataFrame.
现在,让我们看看如何将我们的DataFrame转换成Dataset。如果我们想操作我们现有的POJO和仅适用于DataFrame的扩展API,这种转换是很有用的。
We’ll continue with the DataFrame created from JSON in the previous section.
我们将继续讨论上一节中从JSON创建的DataFrame。
Let’s call a mapper function that takes each row of the Dataset<Row> and converts it into a Customer object:
让我们调用一个映射函数,该函数接收Dataset<Row>的每一行,并将其转换为Customer对象。
Dataset<Customer> ds = df.map(
new CustomerMapper(),
Encoders.bean(Customer.class)
);
Here, the CustomerMapper implements MapFunction<Row, Customer>:
这里,CustomerMapper实现了MapFunction<Row, Customer>/em>。
public class CustomerMapper implements MapFunction<Row, Customer> {
@Override
public Customer call(Row row) {
Customer customer = new Customer();
customer.setId(row.getAs("id"));
customer.setName(row.getAs("name"));
customer.setGender(row.getAs("gender"));
customer.setTransaction_amount(Math.toIntExact(row.getAs("transaction_amount")));
return customer;
}
}
We should note that the MapFunction<Row, Customer> is instantiated only once, whatever the number of records we have to process.
我们应该注意,MapFunction<Row, Customer>只被实例化一次,无论我们要处理多少条记录。
7. DataFrame Operations and Transformations
7.数据框架操作和转换
Now, let’s build a simple pipeline using a customer data example. We want to ingest customer data as DataFrames from two disparate file sources, normalize them, and then perform some transformations on the data.
现在,让我们使用一个客户数据的例子来构建一个简单的管道。我们希望将客户数据作为DataFrames从两个不同的文件源中摄取,对其进行标准化处理,然后对数据进行一些转换。
Finally, we’ll write the transformed data to a database.
最后,我们将把转换后的数据写到数据库中。
The purpose of these transformations is to find out the yearly spending, ordered by gender and source.
这些转换的目的是为了找出按性别和来源排序的年度支出。
7.1. Ingesting Data
7.1.摄取数据
Firstly, let’s ingest data from a couple of sources using SparkSession‘s read method starting with JSON data:
首先,让我们使用SparkSession的read方法从几个来源摄取数据,从JSON数据开始。
Dataset<Row> jsonDataToDF = SPARK_SESSION.read()
.format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
.option("multiline", true)
.load("data/customerData.json");
Now, let’s do the same with our CSV source:
现在,让我们对我们的CSV源做同样的处理。
Dataset<Row> csvDataToDF = SPARK_SESSION.read()
.format("csv")
.option("header", "true")
.schema(SchemaFactory.customerSchema())
.option("dateFormat", "m/d/YYYY")
.load("data/customerData.csv");
csvDataToDF.show();
csvDataToDF.printSchema();
return csvData;
Importantly, to read this CSV data, we’re providing a StructType schema that determines the column data types.
重要的是,为了读取这些CSV数据,我们要提供一个 StructType模式,以确定列的数据类型。
Once we’ve ingested the data, we can inspect the contents of the DataFrame using the show method.
一旦我们摄取了数据,我们就可以使用show method检查DataFrame的内容。
Additionally, we can also limit the rows by providing the size in the show method. And, we can use printSchema to inspect the schemas of the newly created DataFrames.
此外,我们还可以通过在show method中提供大小来限制行。而且,我们可以使用printSchema 来检查新创建的DataFrames的模式。
We’ll notice that the two schemas have some differences. Therefore, we need to normalize the schema before we can do any transformations.
我们会注意到,这两个模式有一些差异。因此,在我们进行任何转换之前,我们需要对模式进行规范化处理。
7.2. Normalizing DataFrames
7.2.规范化DataFrames
Next, we’ll normalize the raw DataFrames representing the CSV and JSON data.
接下来,我们将对代表CSV和JSON数据的原始DataFrames进行标准化。
Here, let’s see some of the transformations performed:
在这里,让我们看看所进行的一些转换。
private Dataset<Row> normalizeCustomerDataFromEbay(Dataset<Row> rawDataset) {
Dataset<Row> transformedDF = rawDataset
.withColumn("id", concat(rawDataset.col("zoneId"),lit("-"), rawDataset.col("customerId")))
.drop(column("customerId"))
.withColumn("source", lit("ebay"))
.withColumn("city", rawDataset.col("contact.customer_city"))
.drop(column("contact"))
.drop(column("zoneId"))
.withColumn("year", functions.year(col("transaction_date")))
.drop("transaction_date")
.withColumn("firstName", functions.split(column("name"), " ")
.getItem(0))
.withColumn("lastName", functions.split(column("name"), " ")
.getItem(1))
.drop(column("name"));
return transformedDF;
}
Some important operations on DataFrame in the above example are:
上述例子中对DataFrame的一些重要操作是。
- concat to join data from multiple columns and literals to make a new id column
- lit static function returns a column with a literal value
- functions. year to extract the year from transactionDate
- function.split to split name into firstname and lastname columns
- drop method removes a column in the data frame
- col method returns a dataset’s column based on its name
- withColumnRenamed returns a column with renamed value
Importantly, we can see that the DataFrame is immutable. Hence, whenever anything needs to change, we must create a new DataFrame.
重要的是,我们可以看到,DataFrame是不可改变的。因此,只要有什么需要改变,我们必须创建一个新的DataFrame。
Eventually, both data frames are normalized to the same schema as below:
最终,两个数据框架都被规范化为如下的相同模式。
root
|-- gender: string (nullable = true)
|-- transaction_amount: long (nullable = true)
|-- id: string (nullable = true)
|-- source: string (nullable = false)
|-- city: string (nullable = true)
|-- year: integer (nullable = true)
|-- firstName: string (nullable = true)
|-- lastName: string (nullable = true)
7.3. Combining DataFrames
7.3.结合数据框架
Let’s combine the normalized DataFrames next:
接下来让我们结合规范化的DataFrames。
Dataset<Row> combineDataframes(Dataset<Row> df1, Dataset<Row> df2) {
return df1.unionByName(df2);
}
Importantly, we should note that:
重要的是,我们应该注意到。
- If we care about column names when we union two DataFrames, we should use unionByName
- If we don’t care about column names when we union two DataFrames, we should use union
7.4. Aggregating DataFrames
7.4.聚合数据帧
Next, let’s group the combined DataFrames to find out the yearly spending by year, source, and gender.
接下来,让我们对合并的数据框架进行分组,找出按年份、来源和性别划分的年度支出。
We’ll then sort the aggregated data by columns year ascending and yearly spent in descending order:
然后,我们将按年升序排列汇总数据,年花费降序排列。
Dataset<Row> aggDF = dataset
.groupBy(column("year"), column("source"), column("gender"))
.sum("transactionAmount")
.withColumnRenamed("sum(transaction_amount)", "yearly spent")
.orderBy(col("year").asc(), col("yearly spent").desc());
Some important operations on DataFrame in the above example are:
上述例子中对DataFrame的一些重要操作是。
- groupBy is used to arrange identical data into groups on DataFrame and then perform aggregate functions similar to SQL “GROUP BY” clause
- sum applies aggregation function on the column transactionAmount after grouping
- orderBy sorts the DataFrame by one or more columns
- asc and desc functions from Column class can be used to specify the sorting order
Finally, let’s use the show method to see what the data frame looks like after transformation:
最后,让我们使用 show方法来看看数据框在转换后的样子。
+----+------+------+---------------+
|year|source|gender|annual_spending|
+----+------+------+---------------+
|2018|amazon| Male| 10600|
|2018|amazon|Female| 6200|
|2018| ebay| Male| 5500|
|2021| ebay|Female| 16000|
|2021| ebay| Male| 13500|
|2021|amazon| Male| 4000|
|2021|amazon|Female| 2000|
+----+------+------+---------------+
Consequently, the schema after the final transformation should be:
因此,最终转换后的模式应该是。
root
|-- source: string (nullable = false)
|-- gender: string (nullable = true)
|-- year: integer (nullable = true)
|-- yearly spent: long (nullable = true)
7.5. Writing from DataFrame to a Relational Database
7.5.从DataFrame写到关系数据库
Finally, let’s finish off by writing the transformed DataFrame as a table in a relational database:
最后,让我们把转换后的DataFrame写成关系数据库中的一个表来结束。
Properties dbProps = new Properties();
dbProps.setProperty("connectionURL", "jdbc:postgresql://localhost:5432/customerdb");
dbProps.setProperty("driver", "org.postgresql.Driver");
dbProps.setProperty("user", "postgres");
dbProps.setProperty("password", "postgres");
Next, we can use the Spark session to write to the database:
接下来,我们可以使用Spark会话来向数据库写数据。
String connectionURL = dbProperties.getProperty("connectionURL");
dataset.write()
.mode(SaveMode.Overwrite)
.jdbc(connectionURL, "customer", dbProperties);
8. Testing
8.测试
Now, we can test the pipeline end-to-end using the two ingestion sources, with postgres and pgAdmin Docker images:
现在,我们可以使用两个摄取源,用postgres和pgAdmin Docker镜像来测试管道的端到端。
@Test
void givenCSVAndJSON_whenRun_thenStoresAggregatedDataFrameInDB() throws Exception {
Properties dbProps = new Properties();
dbProps.setProperty("connectionURL", "jdbc:postgresql://localhost:5432/customerdb");
dbProps.setProperty("driver", "org.postgresql.Driver");
dbProps.setProperty("user", "postgres");
dbProps.setProperty("password", "postgres");
pipeline = new CustomerDataAggregationPipeline(dbProps);
pipeline.run();
String allCustomersSql = "Select count(*) from customer";
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(allCustomersSql);
resultSet.next();
int count = resultSet.getInt(1);
assertEquals(7, count);
}
After running this, we can verify that a table exists with the columns and rows corresponding to the DataFrame. Finally, we can also observe this output via the pgAdmin4 client:
运行后,我们可以验证存在一个表,其列和行与DataFrame对应。最后,我们还可以通过pgAdmin4客户端观察这个输出。
We should note a couple of important points here:
我们应该注意到这里的几个重要问题。
- The customer table is created automatically as a result of the write operation.
- The mode used is SaveMode.Overwrite. Consequently, this will overwrite anything already existing in the table. Other options available are Append, Ignore, and ErrorIfExists.
In addition, we can also use write to export DataFrame data as CSV, JSON, or parquet, among other formats.
此外,我们还可以使用write将DataFrame数据导出为CSV、JSON或parquet等其他格式。
9. Conclusion
9.结语
In this tutorial, we looked at how to use DataFrames to perform data manipulation and aggregation in Apache Spark.
在本教程中,我们研究了如何使用DataFrames在Apache Spark中进行数据操作和聚合。
First, we created the DataFrames from various input sources. Then we used some of the API methods to normalize, combine, and then aggregate the data.
首先,我们从各种输入源创建了DataFrames。然后,我们使用一些API方法来规范化、组合,然后汇总数据。
Finally, we exported the DataFrame as a table in a relational database.
最后,我们将DataFrame导出为关系型数据库中的一个表。
As always, the full source code is available over on GitHub.
一如既往,完整的源代码可在GitHub上获得,。