1. Introduction
1.导言
In this tutorial, we’ll explore how to dynamically route messages in Kafka Streams. Dynamic routing is particularly useful when the destination topic for a message depends on its content, enabling us to direct messages based on specific conditions or attributes within the payload. This kind of conditional routing finds real-world applications in various domains like IoT event handling, user activity tracking, and fraud detection.
在本教程中,我们将探讨如何在 Kafka 流中动态路由消息。当消息的目标主题取决于其内容时,动态路由尤其有用,它使我们能够根据有效载荷中的特定条件或属性来引导消息。这种条件路由在物联网事件处理、用户活动跟踪和欺诈检测等多个领域都有实际应用。
We’ll walk through the problem of consuming messages from a single Kafka topic and conditionally routing them to multiple destination topics. The primary focus will be on how to set this up in a Spring Boot application using the Kafka Streams library.
我们将讨论从单个Kafka 主题消费消息并有条件地将其路由到多个目标主题的问题。主要重点是如何使用 Kafka Streams 库在Spring Boot 应用程序中进行设置。
2. Kafka Streams Routing Techniques
2. Kafka 流路由技术.
Dynamic routing of messages in Kafka Streams isn’t confined to a single approach but rather can be achieved using multiple techniques. Each has its distinct advantages, challenges, and suitability for various scenarios:
在 Kafka 流中动态路由消息并不局限于一种方法,而是可以使用多种技术来实现。每种技术都有其独特的优势、挑战和适用于不同场景的能力:
- KStream Conditional Branching: The KStream.split().branch() method is the conventional means to segregate a stream based on predicates. While this method is easy to implement, it has limitations when it comes to scaling the number of conditions and can become less manageable.
- Branching with KafkaStreamBrancher: This feature appeared in Spring Kafka version 2.2.4. It offers a more elegant and readable way to create branches in a Kafka Stream, eliminating the need for ‘magic numbers’ and allowing more fluid chaining of stream operations.
- Dynamic Routing with TopicNameExtractor: Another method for topic routing is to use a TopicNameExtractor. This allows for a more dynamic topic selection at runtime based on the message key, value, or even the entire record context. However, it requires topics to be created in advance. This method affords more granular control over topic selection and is more adaptive to complex use cases.
- Custom Processors: For scenarios requiring complex routing logic or multiple chained operations, we can apply custom processor nodes in the Kafka Streams topology. This approach is the most flexible but also the most complex to implement.
Throughout this article, we’ll focus on implementing the first three approaches—KStream Conditional Branching, Branching with KafkaStreamBrancher, and Dynamic Routing with TopicNameExtractor.
在本文中,我们将重点实施前三种方法–KStream条件分支、使用KafkaStreamBrancher的分支和使用TopicNameExtractor的动态路由。
3. Setting Up Environment
3. 设置环境
In our scenario, we have a network of IoT sensors streaming various types of data, such as temperature, humidity, and motion to a centralized Kafka topic named iot_sensor_data. Each incoming message contains a JSON object with a field named sensorType that indicates the type of data the sensor is sending. Our aim is to dynamically route these messages to dedicated topics for each type of sensor data.
在我们的应用场景中,物联网传感器网络向名为 iot_sensor_data 的集中式 Kafka 主题流式传输各种类型的数据,如温度、湿度和运动。每个传入消息都包含一个 JSON 对象,其中一个名为 sensorType 的字段表示传感器发送的数据类型。我们的目标是将这些消息动态路由到每种类型传感器数据的专用主题。
First, let’s establish a running Kafka instance. We can set up Kafka, Zookeeper, and Kafka UI using Docker, along with Docker Compose, by creating a docker-compose.yml file:
首先,让我们建立一个正在运行的 Kafka 实例。我们可以使用 Docker 以及 Docker Compose 文件来设置 Kafka、Zookeeper和 Kafka UI,方法是创建 docker-compose.yml 文件:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka_ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- 8082:8080
environment:
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
kafka-init-topics:
image: confluentinc/cp-kafka:latest
depends_on:
- kafka
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka:29092 1 30 && \
kafka-topics --create --topic iot_sensor_data --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092'"
Here we set all required environmental variables and dependencies between services. Furthermore, we are creating the iot_sensor_data topic by using specific commands in the kafka-init-topics service.
在这里,我们设置了所有必要的环境变量和服务间的依赖关系。此外,我们将在 kafka-init-topics 服务中使用特定命令创建 iot_sensor_data 主题。
Now we can run Kafka inside Docker by executing docker-compose up -d.
现在,我们可以通过执行 docker-compose up -d 在 Docker 内运行 Kafka。
Next, we have to add the Kafka Streams dependencies to the pom.xml file:
接下来,我们必须在 pom.xml 文件中添加 Kafka Streams 依赖项:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.6.1</version>`
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
The first dependency is the org.apache.kafka.kafka-streams package, which provides Kafka Streams functionality. The subsequent Maven package, org.springframework.kafka.spring-kafka, facilitates the configuration and integration of Kafka with Spring Boot.
第一个依赖项是提供 Kafka Streams 功能的 org.apache.kafka.kafka-streams 软件包。随后的Maven软件包org.springframework.kafka.spring-kafka为 Kafka 与 Spring Boot 的配置和集成提供了便利。
Another essential aspect is configuring the address of the Kafka broker. This is generally done by specifying the broker details in the application’s properties file. Let’s add this configuration along with other properties to our application.properties file:
另一个重要方面是配置 Kafka 代理的地址。这通常需要在应用程序的属性文件中指定代理的详细信息。让我们将此配置与其他属性一起添加到 application.properties 文件中:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.streams.application-id=baeldung-streams
spring.kafka.consumer.group-id=baeldung-group
spring.kafka.streams.properties[default.key.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
kafka.topics.iot=iot_sensor_data
Next, let’s define a sample data class IotSensorData:
接下来,让我们定义一个示例数据类 IotSensorData:
public class IotSensorData {
private String sensorType;
private String value;
private String sensorId;
}
Lastly, we need to configure Serde for the serialization and deserialization of typed messages in Kafka:
最后,我们需要配置 Serde 以在 Kafka 中对类型化消息进行序列化和反序列化:
@Bean
public Serde<IotSensorData> iotSerde() {
return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(IotSensorData.class));
}
4. Implementing Dynamic Routing in Kafka Streams
4. 在 Kafka 流中实施动态路由
After setting up the environment and installing the required dependencies, let’s focus on implementing dynamic routing logic in Kafka Streams.
在设置好环境并安装好所需的依赖项之后,让我们来专注于在 Kafka Streams 中实现动态路由逻辑。
Dynamic message routing can be an essential part of an event-driven application, as it enables the system to adapt to various types of data flows and conditions without requiring code changes.
动态消息路由是事件驱动应用程序的重要组成部分,因为它使系统能够适应各种类型的数据流和条件,而无需更改代码。
4.1. KStream Conditional Branching
4.1.KStream条件分支
Branching in Kafka Streams allows us to take a single stream of data and split it into multiple streams based on some conditions. These conditions are provided as predicates that evaluate each message as it passes through the stream.
Kafka Streams 中的分支功能允许我们根据一些条件将单个数据流拆分成多个数据流。这些条件以谓词的形式提供,在每条信息通过流时对其进行评估。
In recent versions of Kafka Streams, the branch() method has been deprecated in favor of the newer split().branch() method, which is designed to improve the API’s overall usability and flexibility. Nevertheless, we can apply it in the same way to split a KStream into multiple streams based on certain predicates.
在最近的 Kafka Streams 版本中,branch() 方法已被弃用,取而代之的是更新的 split().branch() 方法,该方法旨在提高 API 的整体可用性和灵活性。尽管如此,我们仍可以同样的方式将 KStream 根据某些谓词分割成多个流。
Here we define the configuration that utilizes the split().branch() method for dynamic topic routing:
在此,我们定义了使用 split().branch() 方法进行动态主题路由的配置:
@Bean
public KStream<String, IotSensorData> iotStream(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
stream.split()
.branch((key, value) -> "temp".equals(value.getSensorType()), Branched.withConsumer((ks) -> ks.to(iotTopicName + "_temp")))
.branch((key, value) -> "move".equals(value.getSensorType()), Branched.withConsumer((ks) -> ks.to(iotTopicName + "_move")))
.branch((key, value) -> "hum".equals(value.getSensorType()), Branched.withConsumer((ks) -> ks.to(iotTopicName + "_hum")))
.noDefaultBranch();
return stream;
}
In the example above, we split the initial stream from the iot_sensor_data topic into multiple streams based on the sensorType property and route them to other topics accordingly.
在上面的示例中,我们根据 sensorType 属性将来自 iot_sensor_data 主题的初始流拆分成多个流,并相应地将它们路由到其他主题。
If a target topic name can be generated based on the message content, we can use a lambda function within the to method for more dynamic topic routing:
如果可以根据消息内容生成目标主题名称,我们就可以在 to 方法中使用 lambda 函数来实现更动态的主题路由:
@Bean
public KStream<String, IotSensorData> iotStreamDynamic(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
stream.split()
.branch((key, value) -> value.getSensorType() != null,
Branched.withConsumer(ks -> ks.to((key, value, recordContext) -> "%s_%s".formatted(iotTopicName, value.getSensorType()))))
.noDefaultBranch();
return stream;
}
This approach provides greater flexibility for routing messages dynamically based on their content if a topic name can be generated based on a message’s content.
如果能根据信息内容生成主题名称,这种方法就能提供更大的灵活性,根据信息内容动态路由信息。
4.2. Routing With KafkaStreamBrancher
4.2.使用 KafkaStreamBrancher 进行路由</em
The KafkaStreamBrancher class provides a builder-style API that allows easier chaining of branching conditions, making code more readable and maintainable.
KafkaStreamBrancher类提供了构建器风格的 API,允许更轻松地连锁分支条件,使代码更具可读性和可维护性。
The primary benefit is the removal of the complexities associated with managing an array of branched streams, which is how the original KStream.branch method works. Instead, KafkaStreamBrancher lets us define each branch along with operations that should happen to that branch, removing the need for magic numbers or complex indexing to identify the correct branch. This approach is closely related to the previous one discussed earlier due to the introduction of split().branch() method.
其主要优点是消除了与管理分支流数组相关的复杂性,这也是最初的 KStream.branch 方法的工作方式。取而代之的是,KafkaStreamBrancher 让我们可以定义每个分支以及应该对该分支进行的操作,从而无需使用神奇数字或复杂的索引来识别正确的分支。由于引入了split().branch()方法,这种方法与前面讨论的方法密切相关。
Let’s apply this approach to a stream:
让我们把这种方法应用到流媒体中:
@Bean
public KStream<String, IotSensorData> kStream(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
new KafkaStreamBrancher<String, IotSensorData>()
.branch((key, value) -> "temp".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_temp"))
.branch((key, value) -> "move".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_move"))
.branch((key, value) -> "hum".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_hum"))
.defaultBranch(ks -> ks.to("%s_unknown".formatted(iotTopicName)))
.onTopOf(stream);
return stream;
}
We’ve applied Fluent API to route the message to a specific topic. Similarly, we can use a single branch() method call to route to multiple topics by using content as a part of a topic name:
我们应用 Fluent API 将消息路由至特定主题。 同样,我们可以使用单个 branch() 方法调用,通过将内容作为主题名称的一部分,将消息路由至多个主题:
@Bean
public KStream<String, IotSensorData> iotBrancherStream(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
new KafkaStreamBrancher<String, IotSensorData>()
.branch((key, value) -> value.getSensorType() != null, (ks) ->
ks.to((key, value, recordContext) -> String.format("%s_%s", iotTopicName, value.getSensorType())))
.defaultBranch(ks -> ks.to("%s_unknown".formatted(iotTopicName)))
.onTopOf(stream);
return stream;
}
By providing a higher level of abstraction for branching logic, KafkaStreamBrancher not only makes the code cleaner but also enhances its manageability, especially for applications with complex routing requirements.
通过为分支逻辑提供更高层次的抽象,KafkaStreamBrancher 不仅使代码更加简洁,还增强了代码的可管理性,尤其是对于具有复杂路由要求的应用程序。
4.3. Dynamic Topic Routing With TopicNameExtractor
4.3.使用 TopicNameExtractor 动态主题路由</em
Another approach to manage conditional branching in Kafka Streams is by using a TopicNameExtractor which, as the name suggests, extracts the topic name dynamically for each message in the stream. This method can be more straightforward for certain use cases compared to the previously discussed split().branch() and KafkaStreamBrancher approaches.
在 Kafka 流中管理有条件分支的另一种方法是使用 TopicNameExtractor ,顾名思义,它可以动态提取流中每条消息的主题名称。与前面讨论的 split().branch() 和 KafkaStreamBrancher 方法相比,这种方法在某些用例中可能更直接。
Here’s a sample configuration using TopicNameExtractor in a Spring Boot application:
以下是在 Spring Boot 应用程序中使用 TopicNameExtractor 的配置示例:
@Bean
public KStream<String, IotSensorData> kStream(StreamsBuilder streamsBuilder) {
KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
TopicNameExtractor<String, IotSensorData> sensorTopicExtractor = (key, value, recordContext) -> "%s_%s".formatted(iotTopicName, value.getSensorType());
stream.to(sensorTopicExtractor);
return stream;
}
While the TopicNameExtractor method is proficient in its primary function of routing records to specific topics, it has some limitations when compared to other approaches like split().branch() and KafkaStreamBrancher. Specifically, TopicNameExtractor doesn’t provide the option to perform additional transformations like mapping or filtering within the same routing step.
虽然 TopicNameExtractor 方法精通其将记录路由到特定主题的主要功能,但与 split().branch() 和 KafkaStreamBrancher 等其他方法相比,它存在一些局限性。具体来说,TopicNameExtractor 不提供在同一路由步骤中执行映射或过滤等附加转换的选项。
5. Conclusion
5.结论
In this article, we’ve seen different approaches for dynamic topic routing using Kafka Streams and Spring Boot.
在本文中,我们看到了使用 Kafka Streams 和 Spring Boot 进行动态主题路由的不同方法。
We began by exploring the modern branching mechanisms like the split().branch() method and the KafkaStreamBrancher class. Furthermore, we examined the dynamic topic routing capabilities offered by TopicNameExtractor.
我们首先探索了现代分支机制,如 split().branch() 方法和 KafkaStreamBrancher 类。此外,我们还考察了 TopicNameExtractor 提供的动态主题路由功能。
Each technique presents its advantages and challenges. For instance, the split().branch() can be cumbersome when handling numerous conditions, whereas the TopicNameExtractor provides a structured flow but restricts certain inline data processes. As a result, grasping the subtle differences of each approach is vital for creating an effective routing implementation.
每种技术都有其优势和挑战。例如,在处理大量条件时,split().branch() 可能会变得繁琐,而 TopicNameExtractor 则提供了结构化流程,但限制了某些内联数据处理。因此,掌握每种方法的细微差别对于创建有效的路由实现至关重要。
As always, the full source code is available over on GitHub.
一如既往,您可以在 GitHub 上获取完整的源代码。