1. Overview
1.概述
In this article, we’ll be looking at the KafkaStreams library.
在这篇文章中,我们将研究KafkaStreams library。
KafkaStreams is engineered by the creators of Apache Kafka. The primary goal of this piece of software is to allow programmers to create efficient, real-time, streaming applications that could work as Microservices.
KafkaStreams是由Apache Kafka的创建者设计的。这款软件的主要目标是让程序员创建高效、实时、流式的应用程序,可以作为微服务工作。
KafkaStreams enables us to consume from Kafka topics, analyze or transform data, and potentially, send it to another Kafka topic.
KafkaStreams 使我们能够从Kafka主题消费,分析或转换数据,并可能将其发送到另一个Kafka主题。
To demonstrate KafkaStreams, we’ll create a simple application that reads sentences from a topic, counts occurrences of words and prints the count per word.
为了演示KafkaStreams,我们将创建一个简单的应用程序,从一个主题中读取句子,计算单词的出现次数并打印每个单词的计数。
Important to note is that the KafkaStreams library isn’t reactive and has no support for async operations and backpressure handling.
需要注意的是,KafkaStreams库并不是反应式的,不支持异步操作和背压处理。
2. Maven Dependency
2.Maven的依赖性
To start writing Stream processing logic using KafkaStreams, we need to add a dependency to kafka-streams and kafka-clients:
要开始使用KafkaStreams编写流处理逻辑,我们需要向kafka-streams和kafka-clients添加一个依赖。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
We also need to have Apache Kafka installed and started because we’ll be using a Kafka topic. This topic will be the data source for our streaming job.
我们还需要安装并启动Apache Kafka,因为我们将使用一个Kafka主题。这个主题将是我们流媒体工作的数据源。
We can download Kafka and other required dependencies from the official website.
我们可以从官方网站下载Kafka和其他必要的依赖项。
3. Configuring KafkaStreams Input
3.配置KafkaStreams输入
The first thing we’ll do is the definition of the input Kafka topic.
我们要做的第一件事是定义输入的Kafka主题。。
We can use the Confluent tool that we downloaded – it contains a Kafka Server. It also contains the kafka-console-producer that we can use to publish messages to Kafka.
我们可以使用我们下载的Confluent工具 – 它包含一个Kafka服务器。它还包含kafka-console-producer,我们可以用它来发布消息到Kafka。
To get started let’s run our Kafka cluster:
为了开始,我们来运行我们的Kafka集群。
./confluent start
Once Kafka starts, we can define our data source and name of our application using APPLICATION_ID_CONFIG:
一旦Kafka启动,我们就可以使用APPLICATION_ID_CONFIG定义我们的数据源和应用程序的名称。
String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(
StreamsConfig.APPLICATION_ID_CONFIG,
"wordcount-live-test");
A crucial configuration parameter is the BOOTSTRAP_SERVER_CONFIG. This is the URL to our local Kafka instance that we just started:
一个关键的配置参数是BOOTSTRAP_SERVER_CONFIG.这是我们刚刚启动的本地Kafka实例的URL。
private String bootstrapServers = "localhost:9092";
streamsConfiguration.put(
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
Next, we need to pass the type of the key and value of messages that will be consumed from inputTopic:
接下来,我们需要传递将从inputTopic:消费的信息的键和值的类型。
streamsConfiguration.put(
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
Stream processing is often stateful. When we want to save intermediate results, we need to specify the STATE_DIR_CONFIG parameter.
流处理通常是有状态的。当我们想保存中间结果时,我们需要指定STATE_DIR_CONFIG参数。
In our test, we’re using a local file system:
在我们的测试中,我们使用的是一个本地文件系统。
this.stateDirectory = Files.createTempDirectory("kafka-streams");
streamsConfiguration.put(
StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath().toString());
4. Building a Streaming Topology
4.建立一个流媒体拓扑结构
Once we defined our input topic, we can create a Streaming Topology – that is a definition of how events should be handled and transformed.
一旦我们定义了我们的输入主题,我们就可以创建一个流媒体拓扑结构–这是对事件的处理和转化的定义。
In our example, we’d like to implement a word counter. For every sentence sent to inputTopic, we want to split it into words and calculate the occurrence of every word.
在我们的例子中,我们想实现一个单词计数器。对于每一个发送到inputTopic的句子,我们想把它分成单词,并计算每个单词的出现次数。
We can use an instance of the KStreamsBuilder class to start constructing our topology:
我们可以使用一个KStreamsBuilder类的实例来开始构建我们的拓扑结构。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
To implement word count, firstly, we need to split the values using the regular expression.
为了实现字数统计,首先,我们需要使用正则表达式来分割数值。
The split method is returning an array. We’re using the flatMapValues() to flatten it. Otherwise, we’d end up with a list of arrays, and it’d be inconvenient to write code using such structure.
拆分方法返回的是一个数组。我们正在使用flatMapValues()来平铺它。否则,我们最终会得到一个数组列表,使用这种结构编写代码会很不方便。
Finally, we’re aggregating the values for every word and calling the count() that will calculate occurrences of a specific word.
最后,我们要汇总每个词的值,并调用count(),计算出特定词的出现次数。
5. Handling Results
5 处理结果
We already calculated the word count of our input messages. Now let’s print the results on the standard output using the foreach() method:
我们已经计算出了输入信息的字数。现在让我们使用foreach()方法在标准输出上打印结果:。
wordCounts.toStream()
.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
On production, often such streaming job might publish the output to another Kafka topic.
在生产中,通常这样的流式作业可能会将输出发布到另一个Kafka主题。
We could do this using the to() method:
我们可以使用to()方法来做这件事:。
String outputTopic = "outputTopic";
wordCounts.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
The Serde class gives us preconfigured serializers for Java types that will be used to serialize objects to an array of bytes. The array of bytes will then be sent to the Kafka topic.
Serde 类为我们提供了预先配置的Java类型的序列化器,它将被用来把对象序列化为一个字节数组。然后,该字节数组将被发送到Kafka主题。
We’re using String as a key to our topic and Long as a value for the actual count. The to() method will save the resulting data to outputTopic.
我们使用String作为我们主题的键,Long作为实际计数的值。to() 方法将把结果数据保存到outputTopic。
6. Starting KafkaStream Job
6.启动KafkaStream作业
Up to this point, we built a topology that can be executed. However, the job hasn’t started yet.
到此为止,我们建立了一个可以执行的拓扑结构。然而,工作还没有开始。
We need to start our job explicitly by calling the start() method on the KafkaStreams instance:
我们需要通过调用KafkaStreams实例上的start()方法来明确启动我们的工作:。
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
streams.start();
Thread.sleep(30000);
streams.close();
Note that we are waiting 30 seconds for the job to finish. In a real-world scenario, that job would be running all the time, processing events from Kafka as they arrive.
请注意,我们正在等待30秒来完成这个工作。在现实世界中,这个作业会一直运行,处理来自Kafka的事件,因为它们到达了。
We can test our job by publishing some events to our Kafka topic.
我们可以通过向Kafka主题发布一些事件来测试我们的工作。
Let’s start a kafka-console-producer and manually send some events to our inputTopic:
让我们启动一个kafka-console-producer,并手动发送一些事件到我们的inputTopic:。
./kafka-console-producer --topic inputTopic --broker-list localhost:9092
>"this is a pony"
>"this is a horse and pony"
This way, we published two events to Kafka. Our application will consume those events and will print the following output:
这样一来,我们就向Kafka发布了两个事件。我们的应用程序将消费这些事件,并将打印以下输出。
word: -> 1
word: this -> 1
word: is -> 1
word: a -> 1
word: pony -> 1
word: -> 2
word: this -> 2
word: is -> 2
word: a -> 2
word: horse -> 1
word: and -> 1
word: pony -> 2
We can see that when the first message arrived, the word pony occurred only once. But when we sent the second message, the word pony happened for the second time printing: “word: pony -> 2″.
我们可以看到,当第一条信息到达时,小马这个词只出现了一次。但是当我们发送第二条信息时,小马这个词第二次出现在印刷中。”word: pony -> 2″。
6. Conclusion
6.结论
This article discusses how to create a primary stream processing application using Apache Kafka as a data source and the KafkaStreams library as the stream processing library.
本文讨论了如何使用Apache Kafka作为数据源和KafkaStreams library作为流处理库创建一个主要的流处理应用程序。
All these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.
所有这些例子和代码片段都可以在GitHub项目中找到–这是一个Maven项目,所以应该很容易导入并按原样运行。