Kafka Streams vs. Kafka Consumer – Kafka流与Kafka消费者的关系

最后修改: 2021年 6月 4日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

Apache Kafka is the most popular open-source distributed and fault-tolerant stream processing system. Kafka Consumer provides the basic functionalities to handle messages. Kafka Streams also provides real-time stream processing on top of the Kafka Consumer client.

Apache Kafka是最流行的开源分布式和容错流处理系统。Kafka Consumer提供了处理消息的基本功能。Kafka Streams还在Kafka Consumer客户端的基础上提供实时流处理。

In this tutorial, we’ll explain the features of Kafka Streams to make the stream processing experience simple and easy.

在本教程中,我们将解释Kafka流的功能,使流处理体验变得简单而容易。

2. Difference Between Streams and Consumer APIs

2.流和消费者API之间的区别

2.1. Kafka Consumer API

2.1. Kafka消费者API

In a nutshell, Kafka Consumer API allows applications to process messages from topics. It provides the basic components to interact with them, including the following capabilities:

简而言之,Kafka Consumer API允许应用程序处理来自主题的消息。它提供了与之交互的基本组件,包括以下功能

  • Separation of responsibility between consumers and producers
  • Single processing
  • Batch processing support
  • Only stateless support. The client does not keep the previous state and evaluates each record in the stream individually
  • Write an application requires a lot of code
  • No use of threading or parallelism
  • It is possible to write in several Kafka clusters

2.2. Kafka Streams API

2.2 Kafka Streams API

Kafka Streams greatly simplifies the stream processing from topics. Built on top of Kafka client libraries, it provides data parallelism, distributed coordination, fault tolerance, and scalability. It deals with messages as an unbounded, continuous, and real-time flow of records, with the following characteristics:

Kafka Streams大大简化了来自主题的流处理。它建立在Kafka客户端库之上,提供了数据并行性、分布式协调、容错和可扩展性。它将消息作为无界、连续和实时的记录流来处理,具有以下特点。

  • Single Kafka Stream to consume and produce
  • Perform complex processing
  • Do not support batch processing
  • Support stateless and stateful operations
  • Write an application requires few lines of code
  • Threading and parallelism
  • Interact only with a single Kafka Cluster
  • Stream partitions and tasks as logical units for storing and transporting messages

Kafka Streams uses the concepts of partitions and tasks as logical units strongly linked to the topic partitions. Besides, it uses threads to parallelize processing within an application instance. Another important capability supported is the state stores, used by Kafka Streams to store and query data coming from the topics. Finally, Kafka Streams API interacts with the cluster, but it does not run directly on top of it.

Kafka Streams使用分区和任务的概念,作为与主题分区紧密联系的逻辑单元。此外,它使用线程来并行处理一个应用程序实例。支持的另一个重要能力是状态存储,Kafka流用于存储和查询来自主题的数据。最后,Kafka Streams API与集群进行交互,但它并不直接在集群之上运行。

In the coming sections, we’ll focus on four aspects that make the difference with respect to the basic Kafka clients: Stream-table duality, Kafka Streams Domain Specific Language (DSL), Exactly-Once processing Semantics (EOS), and Interactive queries.

在接下来的章节中,我们将专注于四个方面,这些方面与基本的Kafka客户端有很大区别。流-表的二元性、Kafka流的特定领域语言(DSL)、精确一次处理语义(EOS)和交互式查询。

2.3. Dependencies

2.3. 依赖性

To implement the examples, we’ll simply add the Kafka Consumer API and Kafka Streams API dependencies to our pom.xml:

为了实现这些示例,我们只需将Kafka Consumer APIKafka Streams API依赖项添加到我们的pom.xml中。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.0</version>
 </dependency>

3. Stream-Table Duality

3.流-表二元性

Kafka Streams support streams but also tables that can be bidirectionally transformed.  It is the so-called stream-table duality. Tables are a set of evolving facts. Each new event overwrites the old one, whereas streams are a collection of immutable facts.

Kafka流支持流,但也支持可以双向转换的表。 这就是所谓的流-表二元性。表是一组不断发展的事实。每个新的事件都会覆盖旧的事件,而流是一组不可改变的事实。

Streams handle the complete flow of data from the topic. Tables store the state by aggregating information from the streams. Let’s imagine playing a chess game as described in Kafka Data Modelling. The stream of continuous moves are aggregated to a table, and we can transition from one state to another:

流处理来自主题的完整数据流。表通过聚合流中的信息来存储状态。让我们想象一下,按照Kafka数据建模中的描述,玩一场国际象棋。连续的移动流被聚合到一个表中,我们可以从一个状态过渡到另一个状态。

3.1. KStream, KTable and GlobalKTable

3.1.KStreamKTableGlobalKTable

Kafka Streams provides two abstractions for Streams and Tables. KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update.

Kafka Streams为Streams和Tables提供了两个抽象KStream处理的是记录流。另一方面,KTable管理具有给定键的最新状态的变化记录流。每个数据记录代表一个更新。

There is another abstraction for not partitioned tables. We can use GlobalKTables to broadcast information to all tasks or to do joins without re-partitioned the input data.

对于不分区的表还有一个抽象。我们可以使用GlobalKTables来向所有任务广播信息,或者在不重新划分输入数据的情况下进行连接。

We can read and deserialize a topic as a stream:

我们可以将一个主题作为一个流进行读取和反序列化。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = 
  builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

It is also possible to read a topic to track the latest words received as a table:

也可以通过阅读一个主题,以表格的形式跟踪最新收到的文字。

KTable<String, String> textLinesTable = 
  builder.table(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

Finally, we are able to read a topic using a global table:

最后,我们能够使用一个全局表来读取一个主题。

GlobalKTable<String, String> textLinesGlobalTable = 
  builder.globalTable(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

4. Kafka Streams DSL

4.Kafka流DSL

Kafka Streams DSL is a declarative and functional programming style. It is built on top of the Streams Processor API. The language provides the built-in abstractions for streams and tables mentioned in the previous section.

Kafka Streams DSL是一种声明性和功能性编程风格。它建立在Streams Processor API之上。该语言提供了上一节中提到的流和表的内置抽象。

Furthermore, it also supports stateless (map, filter, etc.) and stateful transformations (aggregations, joins, and windowing). Thus, it is possible to implement stream processing operations with just a few lines of code.

此外,它还支持无状态(mapfilter等)和有状态转换(aggregationsjoinswindowing)。因此,只需几行代码就可以实现流处理操作。

4.1. Stateless Transformations

4.1.无状态转换

Stateless transformations don’t require a state for processing. In the same way, a state store is not needed in the stream processor. Example operations include are filter, map, flatMap, or groupBy.

无状态转换不需要处理的状态。同样地,在流处理器中也不需要状态存储。示例操作包括filtermapflatMapgroupBy

Let’s now see how to map the values as UpperCase, filter them from the topic and store them as a stream:

现在让我们看看如何将值映射为UpperCase,从主题中过滤它们,并将它们存储为一个流。

KStream<String, String> textLinesUpperCase =
  textLines
    .map((key, value) -> KeyValue.pair(value, value.toUpperCase()))
    .filter((key, value) -> value.contains("FILTER"));

4.2. Stateful Transformations

4.2.有状态的转换

Stateful transformations depend on the state to fulfil the processing operations. The processing of a message depends on the processing of other messages (state store). In other words, any table or state store can be restored using the changelog topic.

有状态的转换依赖于状态来完成处理操作。一个消息的处理取决于其他消息的处理(状态存储)。换句话说,任何表或状态存储都可以使用changelog主题进行恢复。

An example of stateful transformation is the word count algorithm:

有状态转换的一个例子是字数计算算法。

KTable<String, Long> wordCounts = textLines
  .flatMapValues(value -> Arrays.asList(value
    .toLowerCase(Locale.getDefault()).split("\\W+")))
  .groupBy((key, word) -> word)
    .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));

We’ll send those two strings to the topic:

我们将把这两个字符串发送到主题中。

String TEXT_EXAMPLE_1 = "test test and test";
String TEXT_EXAMPLE_2 = "test filter filter this sentence";

The result is:

其结果是。

Word: and -> 1
Word: test -> 4
Word: filter -> 2
Word: this -> 1
Word: sentence -> 1

DSL covers several transformation features. We can join, or merge two input streams/tables with the same key to produce a new stream/table. We are also able to aggregate, or combe multiple records from streams/tables into one single record in a new table. Finally, it is possible to apply windowing, to group records with the same key in join or aggregation functions.

DSL涵盖了几个转换功能。我们可以join,或者合并两个具有相同键的输入流/表以产生一个新的流/表。我们还能够aggregate,或者将流/表中的多条记录合并成新表中的一条记录。最后,可以应用windowing,在连接或聚合功能中对具有相同键的记录进行分组。

An example of joining with 5s windowing will merge records grouped by key from two streams into one stream:

一个用5s窗口连接的例子将把两个流中按键分组的记录合并成一个流。

KStream<String, String> leftRightSource = leftSource.outerJoin(rightSource,
  (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
    JoinWindows.of(Duration.ofSeconds(5))).groupByKey()
      .reduce(((key, lastValue) -> lastValue))
  .toStream();

So we’ll put in the left stream value=left with key=1 and the right stream value=right and key=2. The result is the following:

所以我们要在左边的数据流中放入value=leftkey=1,右边的数据流中放入value=rightkey=2。结果是这样的。

(key= 1) -> (left=left, right=null)
(key= 2) -> (left=null, right=right)

For the aggregation example, we’ll compute the word count algorithm but using as key the first two letters of each word:

对于聚合的例子,我们将计算字数的算法,但使用每个词的前两个字母作为关键。

KTable<String, Long> aggregated = input
  .groupBy((key, value) -> (value != null && value.length() > 0)
    ? value.substring(0, 2).toLowerCase() : "",
    Grouped.with(Serdes.String(), Serdes.String()))
  .aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(),
    Materialized.with(Serdes.String(), Serdes.Long()));

With the following entries:

有以下条目。

"one", "two", "three", "four", "five"

The output is:

输出是。

Word: on -> 3
Word: tw -> 3
Word: th -> 5
Word: fo -> 4
Word: fi -> 4

5. Exactly-Once Processing Semantics (EOS)

5.完全一次处理语义(EOS)

There are occasions in which we need to ensure that the consumer reads the message just exactly once. Kafka introduced the capability of including the messages into transactions to implement EOS with the Transactional API. The same feature is covered by Kafka Streams from version 0.11.0.

在某些情况下,我们需要确保消费者只读取一次消息。Kafka通过Transactional API引入了将消息纳入事务的功能,以实现EOS。从0.11.0版本开始,Kafka Streams也涵盖了同样的功能。

To configure EOS in Kafka Streams, we’ll include the following property:

为了在Kafka流中配置EOS,我们将包括以下属性。

streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
  StreamsConfig.EXACTLY_ONCE);

6. Interactive Queries

6.互动查询

Interactive queries allow consulting the state of the application in distributed environments. This means the capability of extract information from the local stores, but also from the remote stores on multiple instances. Basically, we’ll gather all the stores and group them together to get the complete state of the application.

交互式查询允许在分布式环境中咨询应用程序的状态。这意味着不仅能够从本地存储中提取信息,而且能够从多个实例的远程存储中提取信息。基本上,我们将收集所有的存储并将它们分组,以获得应用程序的完整状态。

Let’s see an example using interactive queries. Firstly, we’ll define the processing topology, in our case, the word count algorithm:

让我们看一个使用交互式查询的例子。首先,我们将定义处理拓扑结构,在我们的例子中,是字数计算算法。

KStream<String, String> textLines = 
  builder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));

final KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

Next, we’ll create a state store (key-value) for all the computed word counts:

接下来,我们将为所有计算的字数创建一个状态存储(键值)。

groupedByWord
  .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("WordCountsStore")
  .withValueSerde(Serdes.Long()));

Then, we can query the key-value store:

然后,我们可以查询键值存储。

ReadOnlyKeyValueStore<String, Long> keyValueStore =
  streams.store(StoreQueryParameters.fromNameAndType(
    "WordCountsStore", QueryableStoreTypes.keyValueStore()));

KeyValueIterator<String, Long> range = keyValueStore.all();
while (range.hasNext()) {
    KeyValue<String, Long> next = range.next();
    System.out.println("count for " + next.key + ": " + next.value);
}

The output of the example is the following:

该例子的输出结果如下。

Count for and: 1
Count for filter: 2
Count for sentence: 1
Count for test: 4
Count for this: 1

7. Conclusion

7.结语

In this tutorial, we showed how Kafka Streams simplify the processing operations when retrieving messages from Kafka topics. It strongly eases the implementation when dealing with streams in Kafka. Not only for stateless processing but also for stateful transformations.

在本教程中,我们展示了Kafka流如何简化从Kafka主题中检索消息时的处理操作。在处理Kafka中的流时,它极大地简化了实现。不仅是无状态的处理,而且是有状态的转换。

Of course, it is possible to perfectly build a consumer application without using Kafka Streams. But we would need to manually implement the bunch of extra features given for free.

当然,不使用Kafka Streams也可以完美地构建一个消费者应用程序。但我们需要手动实现免费提供的一堆额外功能。

As always, the code is available over on GitHub.

像往常一样,代码可在GitHub上获得