1. Overview
1.概述
Apache Kafka is a distributed and fault-tolerant stream processing system.
Apache Kafka是一个分布式和容错的流处理系统。
In this tutorial, we’ll cover Spring support for Kafka and the level of abstractions it provides over native Kafka Java client APIs.
在本教程中,我们将介绍Spring对Kafka的支持,以及它对原生Kafka Java客户端API的抽象程度。
Spring Kafka brings the simple and typical Spring template programming model with a KafkaTemplate and Message-driven POJOs via @KafkaListener annotation.
Spring Kafka通过KafkaTemplate和@KafkaListener注解的Message-driven POJOs带来了简单而典型的Spring模板编程模型。
2. Installation and Setup
2.安装和设置
To download and install Kafka, please refer to the official guide here.
要下载和安装Kafka,请参考官方指南这里。
We also need to add the spring-kafka dependency to our pom.xml:
我们还需要将spring-kafka依赖性添加到我们的pom.xml。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
The latest version of this artifact can be found here.
该工件的最新版本可以在这里找到。
Our example application will be a Spring Boot application.
我们的示例应用程序将是一个Spring Boot应用程序。
This article assumes that the server is started using the default configuration and that no server ports are changed.
本文假设服务器是使用默认配置启动的,并且没有改变服务器端口。
3. Configuring Topics
3.配置主题
Previously, we ran command-line tools to create topics in Kafka:
在此之前,我们运行命令行工具来创建Kafka中的主题。
$ bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic mytopic
But with the introduction of AdminClient in Kafka, we can now create topics programmatically.
但随着Kafka中AdminClient的引入,我们现在可以以编程方式创建主题。
We need to add the KafkaAdmin Spring bean, which will automatically add topics for all beans of type NewTopic:
我们需要添加KafkaAdmin Spring Bean,它将自动为所有NewTopic类型的bean添加主题。
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic("baeldung", 1, (short) 1);
}
}
4. Producing Messages
4.制作信息
To create messages, we first need to configure a ProducerFactory. This sets the strategy for creating Kafka Producer instances.
为了创建消息,我们首先需要配置一个ProducerFactory。这就为创建Kafka Producer实例设定了策略。
Then we need a KafkaTemplate, which wraps a Producer instance and provides convenience methods for sending messages to Kafka topics.
然后我们需要一个KafkaTemplate,它包装了一个Producer实例,并提供了向Kafka主题发送消息的便利方法。。
Producer instances are thread safe. So, using a single instance throughout an application context will give higher performance. Consequently, KakfaTemplate instances are also thread safe, and use of one instance is recommended.
Producer实例是线程安全的。因此,在整个应用环境中使用一个实例将获得更高的性能。因此,KakfaTemplate实例也是线程安全的,建议使用一个实例。
4.1. Producer Configuration
4.1.生产者配置
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
4.2. Publishing Messages
4.2.发布消息
We can send messages using the KafkaTemplate class:
我们可以使用KafkaTemplate类来发送消息。
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) {
kafkaTemplate.send(topicName, msg);
}
The send API returns a ListenableFuture object. If we want to block the sending thread and get the result about the sent message, we can call the get API of the ListenableFuture object. The thread will wait for the result, but it will slow down the producer.
send API返回一个ListenableFuture对象。如果我们想阻止发送线程并获得关于发送消息的结果,我们可以调用ListenableFuture对象的getAPI。该线程将等待结果,但它会减慢生产者的速度。
Kafka is a fast stream processing platform. Therefore, it’s better to handle the results asynchronously so that the subsequent messages do not wait for the result of the previous message.
Kafka是一个快速的流处理平台。因此,最好是异步处理结果,这样后续的消息就不会等待前一个消息的结果了。
We can do this through a callback:
我们可以通过回调来实现这一目标。
public void sendMessage(String message) {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message=[" + message +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=["
+ message + "] due to : " + ex.getMessage());
}
});
}
5. Consuming Messages
5.消耗信息
5.1. Consumer Configuration
5.1.消费者配置
For consuming messages, we need to configure a ConsumerFactory and a KafkaListenerContainerFactory. Once these beans are available in the Spring bean factory, POJO-based consumers can be configured using @KafkaListener annotation.
对于消费消息,我们需要配置一个ConsumerFactory和一个KafkaListenerContainerFactory。一旦这些Bean在SpringBean工厂中可用,就可以使用@KafkaListener注解来配置基于POJO的消费者。
@EnableKafka annotation is required on the configuration class to enable detection of @KafkaListener annotation on spring-managed beans:
@EnableKafka注解在配置类上是必需的,以便在spring管理的Bean上实现对@KafkaListener注解的检测。
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
5.2. Consuming Messages
5.2.消耗信息
@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group foo: " + message);
}
We can implement multiple listeners for a topic, each with a different group Id. Furthermore, one consumer can listen for messages from various topics:
我们可以为一个主题实现多个监听器,每个监听器都有一个不同的组ID。此外,一个消费者可以监听来自不同主题的消息。
@KafkaListener(topics = "topic1, topic2", groupId = "foo")
Spring also supports retrieval of one or more message headers using the @Header annotation in the listener:
Spring还支持使用监听器中的@Header注解来检索一个或多个消息头。
@KafkaListener(topics = "topicName")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}
5.3. Consuming Messages from a Specific Partition
5.3.从一个特定的分区消耗信息
Notice that we created the topic baeldung with only one partition.
注意,我们创建的主题baeldung只有一个分区。
For a topic with multiple partitions, however, a @KafkaListener can explicitly subscribe to a particular partition of a topic with an initial offset:
然而,对于一个有多个分区的主题,@KafkaListener可以显式地订阅一个有初始偏移量的主题的特定分区。
@KafkaListener(
topicPartitions = @TopicPartition(topic = "topicName",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")}),
containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}
Since the initialOffset has been set to 0 in this listener, all the previously consumed messages from partitions 0 and 3 will be re-consumed every time this listener is initialized.
由于initialOffset在这个监听器中被设置为0,所以每次初始化这个监听器时,所有以前消耗的0和3分区的消息都将被重新消耗。
If we don’t need to set the offset, we can use the partitions property of @TopicPartition annotation to set only the partitions without the offset:
如果我们不需要设置偏移量,我们可以使用@TopicPartition注解的partitions属性来只设置没有偏移量的分区。
@KafkaListener(topicPartitions
= @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
5.4. Adding Message Filter for Listeners
5.4.为监听器添加消息过滤器
We can configure listeners to consume specific message content by adding a custom filter. This can be done by setting a RecordFilterStrategy to the KafkaListenerContainerFactory:
我们可以通过添加一个自定义过滤器来配置监听器,以消费特定的消息内容。这可以通过向KafkaListenerContainerFactory设置RecordFilterStrategy来实现。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(
record -> record.value().contains("World"));
return factory;
}
We can then configure a listener to use this container factory:
然后我们可以配置一个监听器来使用这个容器工厂。
@KafkaListener(
topics = "topicName",
containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
System.out.println("Received Message in filtered listener: " + message);
}
In this listener, all the messages matching the filter will be discarded.
在这个监听器中,所有符合过滤器的邮件都将被丢弃。
6. Custom Message Converters
6.自定义信息转换器
So far, we have only covered sending and receiving Strings as messages. However, we can also send and receive custom Java objects. This requires configuring appropriate serializer in ProducerFactory and deserializer in ConsumerFactory.
到目前为止,我们只涵盖了发送和接收字符串的消息。然而,我们也可以发送和接收自定义的Java对象。这需要在ProducerFactory中配置适当的序列化器,在ConsumerFactory中配置反序列化器。
Let’s look at a simple bean class, which we will send as messages:
让我们看看一个简单的bean类,,我们将把它作为消息发送。
public class Greeting {
private String msg;
private String name;
// standard getters, setters and constructor
}
6.1. Producing Custom Messages
6.1.制作自定义信息
In this example, we will use JsonSerializer.
在这个例子中,我们将使用JsonSerializer。
Let’s look at the code for ProducerFactory and KafkaTemplate:
我们来看看ProducerFactory和KafkaTemplate的代码。
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
// ...
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
We can use this new KafkaTemplate to send the Greeting message:
我们可以使用这个新的KafkaTemplate来发送Greeting消息。
kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
6.2. Consuming Custom Messages
6.2.消耗自定义消息
Similarly, let’s modify the ConsumerFactory and KafkaListenerContainerFactory to deserialize the Greeting message correctly:
同样,让我们修改ConsumerFactory和KafkaListenerContainerFactory以正确地反序列化Greeting消息。
@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
// ...
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Greeting.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting>
greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}
The spring-kafka JSON serializer and deserializer uses the Jackson library, which is also an optional Maven dependency for the spring-kafka project.
spring-kafka JSON 串行器和反串行器使用Jackson库,它也是 spring-kafka 项目的可选 Maven 依赖项。
So, let’s add it to our pom.xml:
所以,让我们把它添加到我们的pom.xml。
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.7</version>
</dependency>
Instead of using the latest version of Jackson, it’s recommended to use the version that is added to the pom.xml of spring-kafka.
与其使用最新版本的Jackson,不如使用spring-kafka的pom.xml中加入的版本。
Finally, we need to write a listener to consume Greeting messages:
最后,我们需要写一个监听器来消费Greeting消息。
@KafkaListener(
topics = "topicName",
containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
// process greeting message
}
7. Multi-Method Listeners
7.多方法听众
Let’s now see how we can configure our application to send various kinds of objects to the same topic and then consume them.
现在让我们看看如何配置我们的应用程序,使其向同一主题发送各种类型的对象,然后消费它们。
First, we’ll add a new class, Farewell:
首先,我们将添加一个新的类,Farewell。
public class Farewell {
private String message;
private Integer remainingMinutes;
// standard getters, setters and constructor
}
We’ll need some extra configuration to be able to send both Greeting and Farewell objects to the same topic.
我们需要一些额外的配置,以便能够将Greeting和Farewell对象同时发送到同一个主题。
7.1. Set Mapping Types in the Producer
7.1.在生产者中设置映射类型
In the producer, we have to configure the JSON type mapping:
在生产者中,我们必须配置JSON>类型映射。
configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
This way, the library will fill in the type header with the corresponding class name.
这样一来,库就会在类型头里填上相应的类名。
As a result, the ProducerFactory and KafkaTemplate look like this:
因此,ProducerFactory和KafkaTemplate看起来像这样。
@Bean
public ProducerFactory<String, Object> multiTypeProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.TYPE_MAPPINGS,
"greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
return new KafkaTemplate<>(multiTypeProducerFactory());
}
We can use this KafkaTemplate to send a Greeting, Farewell, or any Object to the topic:
我们可以使用这个KafkaTemplate来发送问候、告别或任何对象到主题。
multiTypeKafkaTemplate.send(multiTypeTopicName, new Greeting("Greetings", "World!"));
multiTypeKafkaTemplate.send(multiTypeTopicName, new Farewell("Farewell", 25));
multiTypeKafkaTemplate.send(multiTypeTopicName, "Simple string message");
7.2. Use a Custom MessageConverter in the Consumer
7.2.在消费者中使用自定义MessageConverter
To be able to deserialize the incoming message, we’ll need to provide our Consumer with a custom MessageConverter.
为了能够对传入的消息进行反序列化,我们需要为我们的Consumer提供一个自定义的MessageConverter。
Behind the scene, the MessageConverter relies on a Jackson2JavaTypeMapper. By default, the mapper infers the type of the received objects: on the contrary, we need to tell it explicitly to use the type header to determine the target class for deserialization:
在幕后,MessageConverter依赖于一个Jackson2JavaTypeMapper。默认情况下,映射器会推断出接收对象的类型:相反,我们需要明确告诉它使用类型头来确定反序列化的目标类。
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
We also need to provide the reverse mapping information. Finding “greeting” in the type header identifies a Greeting object, whereas “farewell” corresponds to a Farewell object:
我们还需要提供反向映射信息。在类型头中找到“greeting”可以识别一个Greeting对象,而“farewell”对应于一个Farewell对象。
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("greeting", Greeting.class);
mappings.put("farewell", Farewell.class);
typeMapper.setIdClassMapping(mappings);
Lastly, we need to configure the packages trusted by the mapper. We have to make sure that it contains the location of the target classes:
最后,我们需要配置映射器所信任的包。我们必须确保它包含目标类的位置。
typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
As a result, here is the final definition of this MessageConverter:
因此,这里是这个MessageConverter的最终定义。
@Bean
public RecordMessageConverter multiTypeConverter() {
StringJsonMessageConverter converter = new StringJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("greeting", Greeting.class);
mappings.put("farewell", Farewell.class);
typeMapper.setIdClassMapping(mappings);
converter.setTypeMapper(typeMapper);
return converter;
}
We now need to tell our ConcurrentKafkaListenerContainerFactory to use the MessageConverter and a rather basic ConsumerFactory:
我们现在需要告诉我们的ConcurrentKafkaListenerContainerFactory来使用MessageConverter和一个相当基本的ConsumerFactory。
@Bean
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(multiTypeConsumerFactory());
factory.setMessageConverter(multiTypeConverter());
return factory;
}
7.3. Use @KafkaHandler in the Listener
7.3.在监听器中使用@KafkaHandler
Last but not least, in our KafkaListener, we’ll create a handler method to retrieve every possible kind of object. Each handler will need to be annotated with @KafkaHandler.
最后但同样重要的是,在我们的KafkaListener中,我们将创建一个处理方法来检索每一种可能的对象。每个处理方法都需要用@KafkaHandler来注释。
As a final note, let’s point out that we can also define a default handler for objects that can’t be bound to one of the Greeting or Farewell class:
最后,让我们指出,我们也可以为那些不能被绑定到Greeting或Farewell类中的对象定义一个默认处理程序。
@Component
@KafkaListener(id = "multiGroup", topics = "multitype")
public class MultiTypeKafkaListener {
@KafkaHandler
public void handleGreeting(Greeting greeting) {
System.out.println("Greeting received: " + greeting);
}
@KafkaHandler
public void handleF(Farewell farewell) {
System.out.println("Farewell received: " + farewell);
}
@KafkaHandler(isDefault = true)
public void unknown(Object object) {
System.out.println("Unkown type received: " + object);
}
}
8. Conclusion
8.结论
In this article, we covered the basics of Spring support for Apache Kafka. We took a brief look at the classes used for sending and receiving messages.
在这篇文章中,我们介绍了Spring支持Apache Kafka的基本情况。我们简单看了一下用于发送和接收消息的类。
Complete source code for this article can be found over on GitHub.
本文的完整源代码可以在GitHub上找到over。
Before running the code, please make sure that Kafka server is running and that the topics are created manually.
在运行代码之前,请确保Kafka服务器正在运行,并且主题是手动创建的。