1. Introduction
1.导言
Apache Kafka is a distributed streaming platform that allows us to publish and subscribe to streams of records, often referred to as messages. Additionally, Kafka headers provide a way to attach metadata to Kafka messages, enabling additional context and flexibility in message processing.
Apache Kafka 是一个分布式流平台,允许我们发布和订阅记录流(通常称为消息)。此外,Kafka 头提供了一种将元数据附加到 Kafka 消息的方法,从而在消息处理过程中实现额外的上下文和灵活性。
In this tutorial, we’ll delve into commonly used Kafka headers and learn how to view and extract them using Java.
在本教程中,我们将深入研究常用的 Kafka 标头,并学习如何使用 Java 查看和提取它们。
2. Overview of Kafka Headers
2.Kafka 标头概述
Kafka headers represent key-value pairs attached to Kafka messages, offering a means to include supplementary metadata alongside the primary message content.
Kafka报文头代表附加到 Kafka 报文的键值对,提供了一种在主要报文内容旁边包含补充元数据的方法。
For example, Kafka headers facilitate message routing by providing data for directing messages to specific processing pipelines or consumers. Moreover, headers are versatile in carrying custom application metadata tailored to the application’s processing logic.
例如,Kafka 标头通过提供数据,将消息导向特定的处理管道或消费者,从而促进消息路由。此外,报文头还可以根据应用程序的处理逻辑携带自定义应用程序元数据。
3. Kafka Default Headers
3.Kafka 默认头
Kafka automatically includes several default headers in messages sent by the Kafka producer. Moreover, these headers offer crucial metadata and context about the message. In this section, we’ll delve into a few commonly used headers and their significance in the realm of Kafka message processing.
Kafka会自动在Kafka生产者发送的消息中包含几个默认头。此外,这些头提供了有关消息的重要元数据和上下文。在本节中,我们将深入探讨几个常用的报头及其在 Kafka 消息处理领域中的重要性。
3.1. Producer Headers
3.1.生产者标头
When messages are produced in Kafka, several default headers are automatically included by the producer, such as:
在 Kafka 中生成消息时,生产者会自动包含几个默认头,例如
- KafkaHeaders.TOPIC – This header contains the name of the topic to which the message belongs.
- KafkaHeaders.KEY – If the message is produced with a key, Kafka automatically includes a header named “key” containing the serialized key bytes.
- KafkaHeaders.PARTITION – Kafka adds a header named “partition” to indicate the partition ID to which the message belongs.
- KafkaHeaders.TIMESTAMP – Kafka attaches a header named “timestamp” to each message, indicating the timestamp of when the message was produced by the producer.
3.2. Consumer Headers
3.2.消费者标头
Headers prefixed with RECEIVED_ are added by the Kafka consumer upon message consumption to provide metadata about the message’s reception process:
以 RECEIVED_ 为前缀的标头由 Kafka 消费者在接收消息时添加,以提供有关消息接收过程的元数据:
- KafkaHeaders.RECEIVED_TOPIC – This header contains the name of the topic from which the message was received.
- KafkaHeaders.RECEIVED_KEY – This header allows consumers to access the key associated with the message.
- KafkaHeaders.RECEIVED_PARTITION – Kafka adds this header to indicate the ID of the partition to which the message was assigned.
- KafkaHeaders.RECEIVED_TIMESTAMP – This header reflects the time at which the consumer received the message.
- KafkaHeaders.OFFSET – The offset indicates the position of the message in the partition’s log.
4. Consuming Messages With Headers
4.消耗带标题的报文
To begin, we instantiate a KafkaConsumer object. The KafkaConsumer is responsible for subscribing to Kafka topics and fetching messages from them. After instantiating the KafkaConsumer, we subscribe to the Kafka topic from which we want to consume messages. By subscribing to a topic, the consumer can receive messages published on that topic.
首先,我们实例化一个 KafkaConsumer 对象。KafkaConsumer 负责订阅 Kafka 主题并从中获取消息。实例化KafkaConsumer后,我们订阅要从中获取消息的 Kafka 主题。通过订阅一个主题,消费者可以接收在该主题上发布的消息。
Once the consumer subscribes to the topic, we proceed to fetch records from Kafka. During this process, the KafkaConsumer retrieves messages from the subscribed topic, along with their associated headers.
在此过程中,KafkaConsumer会从订阅的主题中检索消息及其相关标头。
Here’s a code example demonstrating how to consume messages with headers:
下面的代码示例演示了如何使用带报文头的报文:
@KafkaListener(topics = "my-topic")
public void listen(String message, @Headers Map<String, Object> headers) {
System.out.println("Received message: " + message);
System.out.println("Headers:");
headers.forEach((key, value) -> System.out.println(key + ": " + value));
}
The Kafka listener container invokes the listen() method when receiving a message from the specified topic(s), such as “my-topic“. The @Headers annotation indicates that the parameter should be populated with the headers of the received message.
Kafka 监听器容器在收到来自指定主题(如”my-topic“)的消息时会调用 listen() 方法。@Headers注解表示应使用接收到的消息的标题填充该参数。
Below is an example output:
下面是一个输出示例:
Received message: Hello Baeldung!
Headers:
kafka_receivedMessageKey: null
kafka_receivedPartitionId: 0
kafka_receivedTopic: my-topic
kafka_offset: 123
... // other headers
To access a specific header, we can use the get() method of the headers map, providing the key of the desired header. Below is an example to access the topic name:
要访问特定的标题,我们可以使用 headers 映射的 get() 方法,并提供所需标题的键值。下面是一个访问主题名称的示例:
String topicName = headers.get(KafkaHeaders.TOPIC);
The topicName should return my-topic.
topicName 应返回 my-topic 。
Additionally, when consuming messages, we can directly extract the headers needed for processing as method parameters if we already know them. This approach offers a more concise and targeted way to access specific header values without iterating through all headers.
此外,在消费消息时,如果我们已经知道处理所需的头信息,我们可以直接提取这些头信息作为方法参数。这种方法提供了一种更简洁、更有针对性的方式来访问特定的报头值,而无需遍历所有报头。
Here’s a code example demonstrating how to consume messages with headers, directly extracting specific headers as method parameters:
下面是一个代码示例,演示了如何消费带有标题的消息,直接提取特定的标题作为方法参数:
@KafkaListener(topics = "my-topic")
public void listen(String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Received message: " + message);
System.out.println("Partition: " + partition);
}
In the listen() method, we directly extract the RECEIVED_PARTITION header using the @Header annotation. This annotation allows us to specify the header we want to extract and its corresponding type. Injecting the value of the header directly into the method parameter (in this case, partition) enables direct access within the method body.
在 listen() 方法中,我们使用 @Header 注解直接提取 RECEIVED_PARTITION 标头。该注解允许我们指定要提取的头及其相应的类型。将头信息的值直接注入方法参数(在本例中为 partition),可实现在方法主体中的直接访问。
Below is the output:
下面是输出结果:
Received message: Hello Baeldung!
Partition: 0
5. Conclusion
5.结论
In this article, we’ve explored the significance of Kafka headers in message processing within Apache Kafka. We’ve explored the default headers that both producers and consumers automatically include. Additionally, we’ve learned how to extract and work with these headers.
在本文中,我们探讨了 Kafka 头信息在 Apache Kafka 消息处理中的重要性。我们探讨了生产者和消费者都会自动包含的默认头信息。此外,我们还学习了如何提取和使用这些标头。
As always, the code for the examples is available over on GitHub.
与往常一样,这些示例的代码可在 GitHub 上获取。