1. Overview
1.概述
Apache Spark is a fast, distributed data processing system. It does in-memory data processing and uses in-memory caching and optimized execution resulting in fast performance. It provides high-level APIs for popular programming languages like Scala, Python, Java, and R.
Apache Spark是一个快速的分布式数据处理系统。它在内存中进行数据处理,并使用内存中的缓存和优化执行,从而实现快速的性能。它为Scala、Python、Java和R等流行的编程语言提供了高级API。
In this quick tutorial, we’ll go through three of the Spark basic concepts: dataframes, datasets, and RDDs.
在这个快速教程中,我们将了解Spark的三个基本概念:数据帧、数据集和RDDs。
2. DataFrame
2.数据框架
Spark SQL introduced a tabular data abstraction called a DataFrame since Spark 1.3. Since then, it has become one of the most important features in Spark. This API is useful when we want to handle structured and semi-structured, distributed data.
Spark SQL从Spark 1.3开始引入了一个叫做DataFrame的表格数据抽象。从那时起,它已经成为Spark中最重要的功能之一。当我们想处理结构化和半结构化的分布式数据时,这个API很有用。
In section 3, we’ll discuss Resilient Distributed Datasets (RDD). DataFrames store data in a more efficient manner than RDDs, this is because they use the immutable, in-memory, resilient, distributed, and parallel capabilities of RDDs but they also apply a schema to the data. DataFrames also translate SQL code into optimized low-level RDD operations.
在第3节中,我们将讨论弹性分布式数据集(RDD)。DataFrames以比RDD更高效的方式存储数据,这是因为它们使用了RDD的不可变、内存、弹性、分布式和并行能力,但它们也对数据应用了一个模式。
We can create DataFrames in three ways:
我们可以通过三种方式创建DataFrames。
- Converting existing RDDs
- Running SQL queries
- Loading external data
Spark team introduced SparkSession in version 2.0, it unifies all different contexts assuring developers won’t need to worry about creating different contexts:
Spark团队在2.0版本中引入了SparkSession,它统一了所有不同的上下文,确保开发者不需要担心创建不同的上下文。
SparkSession session = SparkSession.builder()
.appName("TouristDataFrameExample")
.master("local[*]")
.getOrCreate();
DataFrameReader dataFrameReader = session.read();
We’ll be analyzing the Tourist.csv file:
我们将分析Tourist.csv文件。
Dataset<Row> data = dataFrameReader.option("header", "true")
.csv("data/Tourist.csv");
Since Spark 2.0 DataFrame became a Dataset of type Row, so we can use a DataFrame as an alias for a Dataset<Row>.
自Spark 2.0以来,DataFrame成为Dataset类型的Row,因此我们可以使用DataFrame作为Dataset<Row>的别名。
We can select specific columns that we are interested in. We can also filter and group by a given column:
我们可以选择我们感兴趣的特定列。我们还可以通过一个给定的列进行过滤和分组。
data.select(col("country"), col("year"), col("value"))
.show();
data.filter(col("country").equalTo("Mexico"))
.show();
data.groupBy(col("country"))
.count()
.show();
3. Datasets
3.数据集
A dataset is a set of strongly-typed, structured data. They provide the familiar object-oriented programming style plus the benefits of type safety since datasets can check syntax and catch errors at compile time.
数据集是一组强类型、结构化的数据。它们提供了熟悉的面向对象的编程风格以及类型安全的好处,因为数据集可以在编译时检查语法和捕捉错误。
Dataset is an extension of DataFrame, thus we can consider a DataFrame an untyped view of a dataset.
Dataset是DataFrame的扩展,因此我们可以认为DataFrame是数据集的非类型视图。
The Spark team released the Dataset API in Spark 1.6 and as they mentioned: “the goal of Spark Datasets is to provide an API that allows users to easily express transformations on object domains, while also providing the performance and robustness advantages of the Spark SQL execution engine”.
Spark团队在Spark 1.6中发布了Dataset API,正如他们提到的。”Spark Datasets的目标是提供一个API,允许用户轻松表达对象域的转换,同时也提供Spark SQL执行引擎的性能和稳健性优势”。
First, we’ll need to create a class of type TouristData:
首先,我们需要创建一个TouristData类型的类。
public class TouristData {
private String region;
private String country;
private String year;
private String series;
private Double value;
private String footnotes;
private String source;
// ... getters and setters
}
To map each of our records to the specified type we will need to use an Encoder. Encoders translate between Java objects and Spark’s internal binary format:
为了将我们的每条记录映射到指定的类型,我们将需要使用编码器。编码器在Java对象和Spark的内部二进制格式之间进行转换。
// SparkSession initialization and data load
Dataset<Row> responseWithSelectedColumns = data.select(col("region"),
col("country"), col("year"), col("series"), col("value").cast("double"),
col("footnotes"), col("source"));
Dataset<TouristData> typedDataset = responseWithSelectedColumns
.as(Encoders.bean(TouristData.class));
As with DataFrame, we can filter and group by specific columns:
与DataFrame一样,我们可以通过特定的列进行过滤和分组。
typedDataset.filter((FilterFunction) record -> record.getCountry()
.equals("Norway"))
.show();
typedDataset.groupBy(typedDataset.col("country"))
.count()
.show();
We can also do operations like filter by column matching a certain range or computing the sum of a specific column, to get the total value of it:
我们还可以进行一些操作,如通过与某一范围相匹配的列进行过滤,或计算某一特定列的总和,以获得它的总价值。
typedDataset.filter((FilterFunction) record -> record.getYear() != null
&& (Long.valueOf(record.getYear()) > 2010
&& Long.valueOf(record.getYear()) < 2017)).show();
typedDataset.filter((FilterFunction) record -> record.getValue() != null
&& record.getSeries()
.contains("expenditure"))
.groupBy("country")
.agg(sum("value"))
.show();
4. RDDs
4.RDDs
The Resilient Distributed Dataset or RDD is Spark’s primary programming abstraction. It represents a collection of elements that is: immutable, resilient, and distributed.
弹性分布式数据集或RDD是Spark的主要编程抽象。它代表了一个元素的集合,是。不可变的、有弹性的和分布式的。
An RDD encapsulates a large dataset, Spark will automatically distribute the data contained in RDDs across our cluster and parallelize the operations we perform on them.
一个RDD封装了一个大型数据集,Spark会自动将RDD中包含的数据分布在我们的集群中,并将我们对其进行的操作并行化。
We can create RDDs only through operations of data in stable storage or operations on other RDDs.
我们只能通过对稳定存储中的数据的操作或对其他RDD的操作来创建RDD。
Fault tolerance is essential when we deal with large sets of data and the data is distributed on cluster machines. RDDs are resilient because of Spark’s built-in fault recovery mechanics. Spark relies on the fact that RDDs memorize how they were created so that we can easily trace back the lineage to restore the partition.
当我们处理大型数据集且数据分布在集群机器上时,容错是至关重要的。由于Spark内置的故障恢复机制,RDDs是有弹性的。Spark依靠的是RDDs记住了它们是如何被创建的,这样我们就可以很容易地回溯血统,恢复分区。
There are two types of operations we can do on RDDs: Transformations and Actions.
我们可以在RDD上进行两种类型的操作。转换和行动。
4.1. Transformations
4.1.变革
We can apply Transformations to an RDD to manipulate its data. After this manipulation is performed, we’ll get a brand-new RDD, since RDDs are immutable objects.
我们可以对RDD应用Transformations来操作它的数据。在执行这一操作后,我们将得到一个全新的RDD,因为RDD是不可改变的对象。
We’ll check how to implement Map and Filter, two of the most common transformations.
我们将检查如何实现Map和Filter这两种最常见的转换。
First, we need to create a JavaSparkContext and load the data as an RDD from the Tourist.csv file:
首先,我们需要创建一个JavaSparkContext,并从Tourist.csv文件中加载数据作为RDD。
SparkConf conf = new SparkConf().setAppName("uppercaseCountries")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> tourists = sc.textFile("data/Tourist.csv");
Next, let’s apply the map function to get the name of the country from each record and convert the name to uppercase. We can save this newly generated dataset as a text file on disk:
接下来,让我们应用地图函数从每条记录中获取国家的名称,并将名称转换成大写字母。我们可以把这个新生成的数据集保存为磁盘上的一个文本文件。
JavaRDD<String> upperCaseCountries = tourists.map(line -> {
String[] columns = line.split(COMMA_DELIMITER);
return columns[1].toUpperCase();
}).distinct();
upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");
If we want to select only a specific country, we can apply the filter function on our original tourists RDD:
如果我们想只选择一个特定的国家,我们可以在我们的原始游客RDD上应用过滤功能。
JavaRDD<String> touristsInMexico = tourists
.filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));
touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");
4.2. Actions
4.2.行动
Actions will return a final value or save the results to disc, after doing some computation on the data.
在对数据进行一些计算后,行动将返回一个最终值或将结果保存到光盘上。
Two of the recurrently used actions in Spark are Count and Reduce.
Spark中经常使用的两个动作是计数和减少。
Let’s count the total countries on our CSV file:
让我们统计一下我们的CSV文件上的国家总数。
// Spark Context initialization and data load
JavaRDD<String> countries = tourists.map(line -> {
String[] columns = line.split(COMMA_DELIMITER);
return columns[1];
}).distinct();
Long numberOfCountries = countries.count();
Now, we’ll calculate the total expenditure by country. We’ll need to filter the records containing expenditure in their description.
现在,我们将按国家计算总支出。我们需要过滤描述中含有支出的记录。
Instead of using a JavaRDD, we’ll use a JavaPairRDD. A pair of RDD is a type of RDD that can store key-value pairs. Let’s check it next:
我们不使用JavaRDD,而是使用JavaPairRDD。一对RDD是一种可以存储键值对的RDD类型。接下来让我们来检查一下。
JavaRDD<String> touristsExpenditure = tourists
.filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));
JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure
.mapToPair(line -> {
String[] columns = line.split(COMMA_DELIMITER);
return new Tuple2<>(columns[1], Double.valueOf(columns[6]));
});
List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd
.reduceByKey((x, y) -> x + y)
.collect();
5. Conclusion
5.总结
To sum up, we should use DataFrames or Datasets when we need domain-specific APIs, we need high-level expressions such as aggregation, sum, or SQL queries. Or when we want type-safety at compile time.
总而言之,当我们需要特定领域的API时,我们应该使用DataFrames或Datasets,我们需要高级表达式,如聚合、求和或SQL查询。或者当我们希望在编译时有类型安全的时候。
On the other hand, we should use RDDs when data is unstructured and we don’t need to implement a specific schema or when we need low-level transformations and actions.
另一方面,当数据是非结构化的,我们不需要实现特定的模式,或者需要低层次的转换和操作时,我们应该使用RDDs。
As always, all of the code samples are available over on GitHub.
一如既往,所有的代码样本都可以在GitHub上找到。