1. Overview
1.概述
Apache Kafka is a scalable, high performance, low latency platform that allows reading and writing streams of data like a messaging system. We can start with Kafka in Java fairly easily.
Apache Kafka是一个可扩展、高性能、低延迟的平台,允许像消息系统一样读写数据流。我们可以相当容易地在Java中开始使用Kafka。
Spark Streaming is part of the Apache Spark platform that enables scalable, high throughput, fault tolerant processing of data streams. Although written in Scala, Spark offers Java APIs to work with.
Spark Streaming是Apache Spark平台的一部分,可对数据流进行可扩展、高吞吐量、容错的处理。虽然是用Scala编写的,但Spark提供了可以使用的Java API。
Apache Cassandra is a distributed and wide-column NoSQL data store. More details on Cassandra is available in our previous article.
Apache Cassandra是一个分布式、宽列的NoSQL数据存储。更多关于Cassandra的细节可在我们之前的文章中找到。
In this tutorial, we’ll combine these to create a highly scalable and fault tolerant data pipeline for a real-time data stream.
在本教程中,我们将结合这些内容,为实时数据流创建一个高度可扩展和容错的数据管道。
2. Installations
2.安装
To start, we’ll need Kafka, Spark and Cassandra installed locally on our machine to run the application. We’ll see how to develop a data pipeline using these platforms as we go along.
首先,我们需要在我们的机器上安装Kafka、Spark和Cassandra来运行应用程序。我们将看到如何使用这些平台开发一个数据管道,因为我们将继续下去。
However, we’ll leave all default configurations including ports for all installations which will help in getting the tutorial to run smoothly.
然而,我们将保留所有的默认配置,包括所有安装的端口,这将有助于让教程顺利运行。
2.1. Kafka
2.1. 卡夫卡
Installing Kafka on our local machine is fairly straightforward and can be found as part of the official documentation. We’ll be using the 2.1.0 release of Kafka.
在我们的本地机器上安装Kafka是相当简单的,可以作为官方文档的一部分。我们将使用Kafka的2.1.0版本。
In addition, Kafka requires Apache Zookeeper to run but for the purpose of this tutorial, we’ll leverage the single node Zookeeper instance packaged with Kafka.
此外,Kafka需要Apache Zookeeper来运行,但为了本教程的目的,我们将利用Kafka打包的单节点Zookeeper实例。
Once we’ve managed to start Zookeeper and Kafka locally following the official guide, we can proceed to create our topic, named “messages”:
一旦我们按照官方指南成功地在本地启动了Zookeeper和Kafka,我们就可以继续创建我们的主题,命名为 “messages”。
$KAFKA_HOME$\bin\windows\kafka-topics.bat --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic messages
Note that the above script is for Windows platform, but there are similar scripts available for Unix-like platforms as well.
注意,上述脚本是针对Windows平台的,但也有类似的脚本可用于类Unix平台。
2.2. Spark
2.2.火花
Spark uses Hadoop’s client libraries for HDFS and YARN. Consequently, it can be very tricky to assemble the compatible versions of all of these. However, the official download of Spark comes pre-packaged with popular versions of Hadoop. For this tutorial, we’ll be using version 2.3.0 package “pre-built for Apache Hadoop 2.7 and later”.
Spark使用Hadoop的HDFS和YARN的客户端库。因此,要组装所有这些的兼容版本可能非常棘手。但是,Spark 的官方下载已经预装了流行版本的 Hadoop。在本教程中,我们将使用2.3.0版本的软件包 “为Apache Hadoop 2.7及更高版本预先构建”。
Once the right package of Spark is unpacked, the available scripts can be used to submit applications. We’ll see this later when we develop our application in Spring Boot.
一旦正确的Spark包被解压,可用的脚本就可以用来提交应用程序。稍后我们在Spring Boot中开发应用程序时就会看到这一点。
2.3. Cassandra
2.3.卡桑德拉
DataStax makes available a community edition of Cassandra for different platforms including Windows. We can download and install this on our local machine very easily following the official documentation. We’ll be using version 3.9.0.
DataStax为包括Windows在内的不同平台提供了Cassandra的社区版。我们可以非常容易地将其下载并安装到我们的本地机器上遵循官方文档。我们将使用3.9.0版本。
Once we’ve managed to install and start Cassandra on our local machine, we can proceed to create our keyspace and table. This can be done using the CQL Shell which ships with our installation:
一旦我们成功地在本地机器上安装并启动了Cassandra,我们就可以继续创建我们的钥匙空间和表。这可以通过我们安装时附带的CQL Shell来完成。
CREATE KEYSPACE vocabulary
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};
USE vocabulary;
CREATE TABLE words (word text PRIMARY KEY, count int);
Note that we’ve created a namespace called vocabulary and a table therein called words with two columns, word, and count.
请注意,我们已经创建了一个名为vocabulary的命名空间和一个名为words的表,其中有两列,word,和count。
3. Dependencies
3.依赖性
We can integrate Kafka and Spark dependencies into our application through Maven. We’ll pull these dependencies from Maven Central:
我们可以通过Maven将Kafka和Spark的依赖关系整合到我们的应用程序中。我们将从Maven中心拉取这些依赖项。
And we can add them to our pom accordingly:
而且我们可以相应地将它们添加到我们的Pom中。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
<version>1.5.2</version>
</dependency>
Note that some these dependencies are marked as provided in scope. This is because these will be made available by the Spark installation where we’ll submit the application for execution using spark-submit.
注意,其中一些依赖关系在范围中被标记为provided。这是因为这些依赖关系将由Spark安装提供,我们将使用spark-submit提交应用程序进行执行。
4. Spark Streaming – Kafka Integration Strategies
4.火花流 – Kafka集成策略
At this point, it is worthwhile to talk briefly about the integration strategies for Spark and Kafka.
在这一点上,值得简单谈一谈Spark和Kafka的整合策略。
Kafka introduced new consumer API between versions 0.8 and 0.10. Hence, the corresponding Spark Streaming packages are available for both the broker versions. It’s important to choose the right package depending upon the broker available and features desired.
Kafka在0.8和0.10版本之间引入了新的消费者API。因此,相应的Spark Streaming包可用于这两个代理版本。根据可用的代理和所需的功能,选择正确的包是很重要的。
4.1. Spark Streaming Kafka 0.8
4.1 Spark Streaming Kafka 0.8
The 0.8 version is the stable integration API with options of using the Receiver-based or the Direct Approach. We’ll not go into the details of these approaches which we can find in the official documentation. An important point to note here is that this package is compatible with Kafka Broker versions 0.8.2.1 or higher.
0.8版本是稳定的集成API,可选择使用基于接收器的方法或直接方法。我们将不讨论这些方法的细节,这些细节我们可以在官方文档中找到。这里需要注意的一点是,该包与Kafka Broker 0.8.2.1或更高版本兼容。
4.2. Spark Streaming Kafka 0.10
4.2.Spark Streaming Kafka 0.10
This is currently in an experimental state and is compatible with Kafka Broker versions 0.10.0 or higher only. This package offers the Direct Approach only, now making use of the new Kafka consumer API. We can find more details about this in the official documentation. Importantly, it is not backward compatible with older Kafka Broker versions.
目前处于实验状态,仅与Kafka Broker 0.10.0或更高版本兼容。 这个包只提供直接方法,现在利用新的Kafka消费者API。我们可以在官方文档中找到关于这个的更多细节。重要的是,它不向后兼容旧的Kafka Broker版本。
Please note that for this tutorial, we’ll make use of the 0.10 package. The dependency mentioned in the previous section refers to this only.
请注意,在本教程中,我们将使用0.10的软件包。上一节中提到的依赖关系仅指这个。
5. Developing a Data Pipeline
5.开发一个数据管道
We’ll create a simple application in Java using Spark which will integrate with the Kafka topic we created earlier. The application will read the messages as posted and count the frequency of words in every message. This will then be updated in the Cassandra table we created earlier.
我们将使用Spark在Java中创建一个简单的应用程序,它将与我们之前创建的Kafka主题集成。该应用程序将读取发布的消息,并计算每条消息中的单词频率。然后,这将在我们之前创建的Cassandra表中更新。
Let’s quickly visualize how the data will flow:
让我们快速直观地了解数据将如何流动。
5.1. Getting JavaStreamingContext
5.1.获取JavaStreamingContext
Firstly, we’ll begin by initializing the JavaStreamingContext which is the entry point for all Spark Streaming applications:
首先,我们将从初始化JavaStreamingContext开始,这是所有Spark Streaming应用程序的入口。
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("WordCountingApp");
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
JavaStreamingContext streamingContext = new JavaStreamingContext(
sparkConf, Durations.seconds(1));
5.2. Getting DStream from Kafka
5.2.从Kafka获取DStream
Now, we can connect to the Kafka topic from the JavaStreamingContext:
现在,我们可以从JavaStreamingContext连接到Kafka主题。
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("messages");
JavaInputDStream<ConsumerRecord<String, String>> messages =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
Please note that we’ve to provide deserializers for key and value here. For common data types like String, the deserializer is available by default. However, if we wish to retrieve custom data types, we’ll have to provide custom deserializers.
请注意,我们必须在这里为键和值提供反序列化器。对于像String这样的普通数据类型,默认情况下,反序列化器是可用的。然而,如果我们希望检索自定义的数据类型,我们就必须提供自定义的反序列化器。
Here, we’ve obtained JavaInputDStream which is an implementation of Discretized Streams or DStreams, the basic abstraction provided by Spark Streaming. Internally DStreams is nothing but a continuous series of RDDs.
在这里,我们得到了JavaInputDStream,它是Discretized Streams或DStreams的一个实现,是Spark Streaming提供的基本抽象。在内部,DStreams不过是一系列连续的RDDs。
5.3. Processing Obtained DStream
5.3.处理获得的DStream
We’ll now perform a series of operations on the JavaInputDStream to obtain word frequencies in the messages:
我们现在要对JavaInputDStream进行一系列操作,以获得消息中的字频。
JavaPairDStream<String, String> results = messages
.mapToPair(
record -> new Tuple2<>(record.key(), record.value())
);
JavaDStream<String> lines = results
.map(
tuple2 -> tuple2._2()
);
JavaDStream<String> words = lines
.flatMap(
x -> Arrays.asList(x.split("\\s+")).iterator()
);
JavaPairDStream<String, Integer> wordCounts = words
.mapToPair(
s -> new Tuple2<>(s, 1)
).reduceByKey(
(i1, i2) -> i1 + i2
);
5.4. Persisting Processed DStream into Cassandra
5.4.将处理过的DStream持久化到Cassandra中
Finally, we can iterate over the processed JavaPairDStream to insert them into our Cassandra table:
最后,我们可以迭代处理过的JavaPairDStream,将它们插入到我们的Cassandra表中。
wordCounts.foreachRDD(
javaRdd -> {
Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
for (String key : wordCountMap.keySet()) {
List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));
JavaRDD<Word> rdd = streamingContext.sparkContext().parallelize(wordList);
javaFunctions(rdd).writerBuilder(
"vocabulary", "words", mapToRow(Word.class)).saveToCassandra();
}
}
);
5.5. Running the Application
5.5.运行应用程序
As this is a stream processing application, we would want to keep this running:
由于这是一个流处理应用程序,我们希望保持它的运行。
streamingContext.start();
streamingContext.awaitTermination();
6. Leveraging Checkpoints
6.充分利用检查站
In a stream processing application, it’s often useful to retain state between batches of data being processed.
在流处理应用程序中,在被处理的数据批次之间保留状态往往是有用的。
For example, in our previous attempt, we are only able to store the current frequency of the words. What if we want to store the cumulative frequency instead? Spark Streaming makes it possible through a concept called checkpoints.
例如,在我们之前的尝试中,我们只能存储单词的当前频率。如果我们想存储累积频率,又该怎么办?Spark Streaming通过一个叫做检查点的概念使之成为可能。
We’ll now modify the pipeline we created earlier to leverage checkpoints:
我们现在要修改我们先前创建的管道,以利用检查点。
Please note that we’ll be using checkpoints only for the session of data processing. This does not provide fault-tolerance. However, checkpointing can be used for fault tolerance as well.
请注意,我们将只在数据处理的环节中使用检查点。这并不提供容错功能。然而,检查点也可用于容错。。
There are a few changes we’ll have to make in our application to leverage checkpoints. This includes providing the JavaStreamingContext with a checkpoint location:
为了利用检查点,我们必须在我们的应用程序中做一些改变。这包括为JavaStreamingContext提供一个检查点位置。
streamingContext.checkpoint("./.checkpoint");
Here, we are using the local filesystem to store checkpoints. However, for robustness, this should be stored in a location like HDFS, S3 or Kafka. More on this is available in the official documentation.
这里,我们使用本地文件系统来存储检查点。然而,为了稳健起见,应该将其存储在HDFS、S3或Kafka等位置。关于这一点的更多信息可在官方文档中找到。
Next, we’ll have to fetch the checkpoint and create a cumulative count of words while processing every partition using a mapping function:
接下来,我们要在使用映射函数处理每个分区的同时,获取检查点并创建一个累计的字数。
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts
.mapWithState(
StateSpec.function(
(word, one, state) -> {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
)
);
Once we get the cumulative word counts, we can proceed to iterate and save them in Cassandra as before.
一旦我们得到累积字数,我们就可以像以前一样继续迭代并保存在Cassandra中。
Please note that while data checkpointing is useful for stateful processing, it comes with a latency cost. Hence, it’s necessary to use this wisely along with an optimal checkpointing interval.
请注意,虽然数据检查点对有状态的处理很有用,但它也有延迟成本。因此,有必要将其与最佳检查点时间间隔一起明智地使用。
7. Understanding Offsets
7.了解偏移量
If we recall some of the Kafka parameters we set earlier:
如果我们回忆一下之前设置的一些Kafka参数。
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
These basically mean that we don’t want to auto-commit for the offset and would like to pick the latest offset every time a consumer group is initialized. Consequently, our application will only be able to consume messages posted during the period it is running.
这些基本上意味着我们不希望对偏移量进行自动提交,而希望在每次初始化消费者组时挑选最新的偏移量。因此,我们的应用程序将只能消费在其运行期间发布的消息。
If we want to consume all messages posted irrespective of whether the application was running or not and also want to keep track of the messages already posted, we’ll have to configure the offset appropriately along with saving the offset state, though this is a bit out of scope for this tutorial.
如果我们想消费所有发布的消息,而不管应用程序是否在运行,同时也想跟踪已经发布的消息,我们必须适当地配置偏移,同时保存偏移状态,尽管这有点超出了本教程的范围。
This is also a way in which Spark Streaming offers a particular level of guarantee like “exactly once”. This basically means that each message posted on Kafka topic will only be processed exactly once by Spark Streaming.
这也是Spark Streaming提供特定级别保证的一种方式,如 “精确一次”。这基本上意味着Kafka主题上发布的每个消息将只被Spark Streaming精确处理一次。
8. Deploying Application
8.部署应用程序
We can deploy our application using the Spark-submit script which comes pre-packed with the Spark installation:
我们可以使用Spark-submit脚本来部署我们的应用程序,该脚本预装在Spark安装包中。
$SPARK_HOME$\bin\spark-submit \
--class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \
--master local[2]
\target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar
Please note that the jar we create using Maven should contain the dependencies that are not marked as provided in scope.
请注意,我们用Maven创建的jar应该包含范围内未标记为provided的依赖项。
Once we submit this application and post some messages in the Kafka topic we created earlier, we should see the cumulative word counts being posted in the Cassandra table we created earlier.
一旦我们提交这个应用程序,并在我们之前创建的Kafka主题中发布一些消息,我们应该看到累积字数被发布在我们之前创建的Cassandra表中。
9. Conclusion
9.结论
To sum up, in this tutorial, we learned how to create a simple data pipeline using Kafka, Spark Streaming and Cassandra. We also learned how to leverage checkpoints in Spark Streaming to maintain state between batches.
总而言之,在本教程中,我们学习了如何使用Kafka、Spark Streaming和Cassandra创建一个简单的数据管道。我们还学习了如何利用Spark Streaming中的检查点来维持批次间的状态。
As always, the code for the examples is available over on GitHub.
像往常一样,这些例子的代码可以在GitHub上找到over。