1. Overview
1.概述
In this tutorial, we’ll learn how to subscribe a Kafka consumer to multiple topics. This is a common requirement when the same business logic is used for various topics.
在本教程中,我们将学习如何将 Kafka 消费者订阅到多个主题。当相同的业务逻辑用于不同的主题时,这是一个常见的需求。
2. Create a Model Class
2.创建模型类
We’ll consider a simple payment system with two Kafka topics, one for card payments and the other for bank transfers. Let’s create the model class:
我们将考虑一个简单的支付系统,它有两个 Kafka 主题,一个用于银行卡支付,另一个用于银行转账。让我们创建模型类:
public class PaymentData {
private String paymentReference;
private String type;
private BigDecimal amount;
private Currency currency;
// standard getters and setters
}
3. Subscribe to Multiple Topics Using Kafka Consumer API
3.使用 Kafka 消费者 API 订阅多个主题
The first method we’ll discuss uses the Kafka Consumer API. Let’s add the required Maven dependency:
我们要讨论的第一种方法使用 Kafka Consumer API。让我们添加所需的 Maven 依赖关系:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
Let’s also configure the Kafka consumer:
我们还要配置 Kafka 消费者:
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "payments");
kafkaConsumer = new KafkaConsumer<>(properties);
Before consuming messages, we need to subscribe kafkaConsumer to both topics using the subscribe() method:
在消费消息之前,我们需要使用 subscribe() 方法向两个主题订阅 kafkaConsumer 消息:
kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));
We’re ready now to test our configuration. Let’s publish one message on each of the topics:
现在我们可以测试配置了。让我们在每个主题上发布一条信息:
void publishMessages() throws Exception {
ProducerRecord<String, String> cardPayment = new ProducerRecord<>("card-payments",
"{\"paymentReference\":\"A184028KM0013790\", \"type\":\"card\", \"amount\":\"275\", \"currency\":\"GBP\"}");
kafkaProducer.send(cardPayment).get();
ProducerRecord<String, String> bankTransfer = new ProducerRecord<>("bank-transfers",
"{\"paymentReference\":\"19ae2-18mk73-009\", \"type\":\"bank\", \"amount\":\"150\", \"currency\":\"EUR\"}");
kafkaProducer.send(bankTransfer).get();
}
Finally, we can write the integration test:
最后,我们就可以编写集成测试了:
@Test
void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
publishMessages();
kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));
int eventsProcessed = 0;
for (ConsumerRecord<String, String> record : kafkaConsumer.poll(Duration.ofSeconds(10))) {
log.info("Event on topic={}, payload={}", record.topic(), record.value());
eventsProcessed++;
}
assertThat(eventsProcessed).isEqualTo(2);
}
4. Subscribe to Multiple Topics Using Spring Kafka
4.使用 Spring Kafka 订阅多个主题
The second method we’ll discuss uses Spring Kafka.
我们要讨论的第二种方法使用 Spring Kafka。
Let’s add the spring-kafka and jackson-databind dependencies to our pom.xml:
让我们将 spring-kafka 和 jacksondatabind 依赖项添加到 pom.xml 中:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
Let’s also define the ConsumerFactory and ConcurrentKafkaListenerContainerFactory beans:
我们还要定义 ConsumerFactory 和 ConcurrentKafkaListenerContainerFactory Bean:
@Bean
public ConsumerFactory<String, PaymentData> consumerFactory() {
List<String, String> config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(
config, new StringDeserializer(), new JsonDeserializer<>(PaymentData.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PaymentData> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PaymentData> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
We need to subscribe to both topics using the topics attribute of the @KafkaListener annotation:
我们需要使用 @KafkaListener 注解的 topics 属性来订阅这两个主题: 我们需要使用 @KafkaListener 注解的 topics 属性来订阅这两个主题。
@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
Finally, we can create the consumer. Additionally, we’re also including the Kafka header to identify the topic where the message was received:
最后,我们可以创建消费者。此外,我们还加入了 Kafka 标头,以识别接收消息的主题:
@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
public void handlePaymentEvents(
PaymentData paymentData, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on topic={}, payload={}", topic, paymentData);
}
Let’s validate our configuration:
让我们验证我们的配置:
@Test
public void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
doAnswer(invocation -> {
countDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handlePaymentEvents(any(), any());
kafkaTemplate.send("card-payments", createCardPayment());
kafkaTemplate.send("bank-transfers", createBankTransfer());
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
}
5. Subscribe to Multiple Topics Using Kafka CLI
5.使用 Kafka CLI 订阅多个主题
Kafka CLI is the last method we’ll discuss.
Kafka CLI 是我们要讨论的最后一种方法。
First, let’s send a message on each topic:
首先,让我们就每个主题发送一条信息:
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic card-payments
>{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic bank-transfers
>{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}
Now, we can start the Kafka CLI consumer. The include option allows us to specify the list of topics to include for message consumption:
现在,我们可以启动 Kafka CLI 消费者。include(包含)选项允许我们指定要包含在消息消费中的主题列表:
$ bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --include "card-payments|bank-transfers"
Here’s the output when we run the previous command:
下面是我们运行上一条命令时的输出结果:
{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}
{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}
6. Conclusion
6.结论
In this article, we learned three different methods of subscribing a Kafka consumer to multiple topics. This is useful when implementing the same functionality for several topics.
在本文中,我们学习了将 Kafka 消费者订阅到多个主题的三种不同方法。这在为多个主题实现相同功能时非常有用。
The first two methods are based on Kafka Consumer API and Spring Kafka and can be integrated into an existing application. The last one uses Kafka CLI and can be used to verify multiple topics quickly.
前两种方法基于 Kafka Consumer API 和 Spring Kafka,可以集成到现有应用程序中。最后一种方法使用 Kafka CLI,可用于快速验证多个主题。
As always, the complete code can be found over on GitHub.