Creating a Kafka Listener Using the Consumer API – 使用消费者 API 创建 Kafka 监听器

最后修改: 2023年 12月 18日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll learn how to create a Kafka listener and consume messages from a topic using Kafka’s Consumer API. After that, we’ll test our implementation using the Producer API and Testcontainers.

在本教程中,我们将学习如何创建 Kafka 监听器,并使用 Kafka 的 Consumer API 从主题中消费消息。之后,我们将使用 Producer API 和 Testcontainers 测试我们的实现。

We’ll be focusing on setting up a KafkaConsumer without relying on Spring Boot modules.

我们将重点讨论如何在不依赖 Spring Boot 模块的情况下设置 KafkaConsumer

2. Create a Custom Kafka Listener

2.创建自定义 Kafka 监听器

Our custom listener will internally use the Producer and Consumer APIs from the kafka-clients library. Let’s start by adding this dependency to our pom.xml file:

我们的自定义监听器将在内部使用 kafka-clients 库中的 Producer 和 Consumer API。首先,让我们将此依赖关系添加到 pom.xml 文件中:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.1</version>
</dependency>

For the code examples in this article, we’ll create a CustomKafkaListener class that will listen to the topic named “baeldung.articles.published“. Internally, our class will wrap around a KafkaConsumer and leverage it to subscribe to the topic:

在本文的代码示例中,我们将创建一个 CustomKafkaListener 类,该类将监听名为”baeldung.articles.published“的主题。在内部,我们的类将围绕 KafkaConsumer 并利用它来订阅主题:

class CustomKafkaListener {
    private final String topic;
    private final KafkaConsumer<String, String> consumer;

    // ...
}

2.1. Create a KafkaConsumer

2.1.创建 KafkaConsumer

To create a KafkaConsumer, we need to supply a valid configuration via a Properties object. Let’s create a simple consumer that can be used as a default when creating our CustomKafkaListener instance:

要创建 KafkaConsumer,我们需要通过 Properties 对象提供有效的配置。让我们创建一个简单的消费者,在创建 CustomKafkaListener 实例时将其用作默认设置:

public CustomKafkaListener(String topic, String bootstrapServers) {
    this(topic, defaultKafkaConsumer(bootstrapServers));
}

static KafkaConsumer<String, String> defaultKafkaConsumer(String boostrapServers) {
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group_id");
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return new KafkaConsumer<>(props);
}

For this example, we hardcoded most of these properties, but, ideally, they should be loaded from a configuration file. Let’s quickly see what each of the properties means:

在本例中,我们对大部分属性进行了硬编码,但理想情况下,这些属性应从配置文件中加载。让我们快速了解一下每个属性的含义:

  • Boostrap Servers: a list of host and port pairs used for establishing the initial connection to the Kafka cluster
  • Group ID: an ID that allows a group of consumers to jointly consume a set of topic partitions
  • Auto Offset Reset: the position in the Kafka log to start reading data when there is no initial offset
  • Key/Value Deserializers: the deserializer classes for the keys and values. For our example, we’ll use String keys and values and the following deserializer: org.apache.kafka.common.serialization.StringDeserializer

With this minimal configuration, we’ll be able to subscribe to the topic and easily test out implementation. For a complete list of the available properties, consult the official documentation.

有了这些最基本的配置,我们就可以订阅主题并轻松地测试实现。有关可用属性的完整列表,请查阅 官方文档

2.2. Subscribe to a Topic

2.2.订阅主题

Now, we need to provide a way to subscribe to the topic and start polling for messages. This can be achieved using KafkaConsumer‘s subscribe() method, followed by an infinite loop calling to the poll() method. Moreover, since this method will block the thread, we can implement the Runnable interface to provide a nice integration with CompletableFuture:

现在,我们需要提供一种方法来订阅主题并开始轮询消息。这可以使用 KafkaConsumersubscribe() 方法来实现,然后无限循环调用 poll() 方法。此外,由于该方法将阻塞线程,我们可以实现 Runnable 接口,以便与 CompletableFuture 进行良好集成:</em

class CustomKafkaListener implements Runnable {
    private final String topic;
    private final KafkaConsumer<String, String> consumer;

    // constructors

    @Override
    void run() {
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            consumer.poll(Duration.ofMillis(100))
              .forEach(record -> log.info("received: " + record)); 
        } 
    }
}

Now, our CustomKafkaListener can be started  like this without blocking the main thread:

现在,我们的CustomKafkaListener可以像这样启动,而不会阻塞主线程:

String topic = "baeldung.articles.published";
String bootstrapServers = "localhost:9092";

var listener = new CustomKafkaListener(topic, bootstrapServers)
CompletableFuture.runAsync(listener);

2.3. Consume Activity

2.3.消耗活动

Currently, our application only listens to the topic and logs all incoming messages. Let’s further improve it to allow more complex scenarios and make it more testable. For example, we can allow defining a Consumer<String> that will accept each new event from the topic:

目前,我们的应用程序只监听主题并记录所有传入的消息。让我们进一步改进它,以允许更复杂的应用场景,并使其更具可测试性。例如,我们可以定义一个 Consumer<String> 来接受来自主题的每个新事件

class CustomKafkaListener implements Runnable {
    private final String topic;
    private final KafkaConsumer<String, String> consumer;
    private Consumer<String> recordConsumer;

    CustomKafkaListener(String topic, KafkaConsumer<String, String> consumer) {
        this.topic = topic;
        this.consumer = consumer;
        this.recordConsumer = record -> log.info("received: " + record);
    }

   // ...

    @Override
    public void run() {
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            consumer.poll(Duration.ofMillis(100))
              .forEach(record -> recordConsumer.accept(record.value()));
        }
    }

    CustomKafkaListener onEach(Consumer newConsumer) {
        recordConsumer = recordConsumer.andThen(newConsumer);
        return this;
    }
}

Declaring the recordConsumer as a Consumer<String> allows us to chain multiple functions using the default method andThen(). These functions will be called, one by one, for each incoming message.

recordConsumer 声明为 Consumer<String> 允许我们使用默认方法 andThen() 链接多个函数。这些函数将针对每条传入消息逐一调用。

3. Testing

3.测试

To test our implementation, we’ll create a KafkaProducer and use it to publish some messages to the “baeldung.articles.published” topic. Then, we’ll start our CustomKafkaListener and verify it accurately processes all the activity.

为了测试我们的实现,我们将创建一个 KafkaProducer 并用它向”baeldung.articles.published“主题发布一些消息。然后,我们将启动 CustomKafkaListener 并验证它是否能准确处理所有活动。

3.1. Setup Kafka Testcontainer

3.1.Kafka 测试容器设置

We can utilize the Testcontainers library to spin up a Kafka container within our test environment. Firstly, we’ll need to add Testcontainer dependencies for the JUnit5 extension and the Kafka module:

我们可以利用 Testcontainers 库在测试环境中启动 Kafka 容器。首先,我们需要为 JUnit5 扩展Kafka 模块添加 Testcontainer 依赖项:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>

We can now create a KafkaContainer with a specific Docker image name. Then, we’ll add the @Container and @Testcontainers annotations to allow the Testcontainers JUnit5 extension to manage the container’s lifecycle:

现在,我们可以创建一个具有特定 Docker 映像名称的 KafkaContainer 。然后,我们将添加 @Container@Testcontainers 注解,以允许 Testcontainers JUnit5 扩展管理容器的生命周期:

@Testcontainers
class CustomKafkaListenerLiveTest {

    @Container
    private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));

    // ...
}

3.2. Create and Start the Listener

3.2.创建并启动监听器

Firstly, we’ll define the topic name as a hardcoded String and extract the bootstrapServers from the KAFKA_CONTAINER. Additionally, we’ll create an ArrayList<String> that will be used for collecting the messages:

首先,我们将把主题名称定义为硬编码 String 并从 KAFKA_CONTAINER 中提取 bootstrapServers 。此外,我们还将创建一个 ArrayList<String> 用于收集消息:

String topic = "baeldung.articles.published";
String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
List<String> consumedMessages = new ArrayList<>();

We’ll create an instance of a CustomKafkaListener using these properties and instruct it to capture new messages and add them to the consumedMessages list:

我们将使用这些属性创建一个 CustomKafkaListener 实例,并指示它捕获新消息 将它们添加到 consumedMessages 列表中:

CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
listener.run();

However, it’s crucial to note that running it as is might block the thread and freeze the test. To prevent this, we’ll execute it asynchronously using a CompletableFuture:

不过,需要注意的是,按原样运行可能会阻塞线程并冻结测试。为了避免这种情况,我们将使用 CompletableFuture 异步执行:

CompletableFuture.runAsync(listener);

While not critical for testing, we can also instantiate the listener within a try-with-resources block in the first place:

虽然这对测试并不重要,但我们也可以首先在 try-with-resources 块中实例化监听器:

var listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
CompletableFuture.runAsync(listener);

3.3. Publish Messages

3.3.发布信息

To send article names to the “baeldung.articles.published” topic, we’ll set up a KafkaProducer using a Properties object, following a similar approach to what we did for the consumer.

为了将文章名称发送到”baeldung.articles.published“主题,我们将使用 Properties 对象设置一个 KafkaProducer 对象,其方法与我们为消费者所做的类似。

static KafkaProducer<String, String> testKafkaProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    return new KafkaProducer<>(props);
}

This method will allow us to publish messages to test our implementation. Let’s create another test helper that will send a message for each article received as a parameter:

该方法将允许我们发布消息,以测试我们的实现。让我们创建另一个测试助手,它将为收到的每篇作为参数的文章发送一条消息:

private void publishArticles(String topic, String... articles) {
    try (KafkaProducer<String, String> producer = testKafkaProducer()) {
        Arrays.stream(articles)
          .map(article -> new ProducerRecord<String,String>(topic, article))
          .forEach(producer::send);
    }
}

3.4. Verify

3.4.验证

Let’s put all the pieces together and run our test. We’ve already discussed how to create a CustomKafkaListener and start publishing data:

让我们将所有部分组合在一起,运行我们的测试。我们已经讨论过如何创建 CustomKafkaListener 并开始发布数据:

@Test
void givenANewCustomKafkaListener_thenConsumesAllMessages() {
    // given
    String topic = "baeldung.articles.published";
    String bootstrapServers = KAFKA_CONTAINER.getBootstrapServers();
    List<String> consumedMessages = new ArrayList<>();

    // when
    CustomKafkaListener listener = new CustomKafkaListener(topic, bootstrapServers).onEach(consumedMessages::add);
    CompletableFuture.runAsync(listener);
    
    // and
    publishArticles(topic,
      "Introduction to Kafka",
      "Kotlin for Java Developers",
      "Reactive Spring Boot",
      "Deploying Spring Boot Applications",
      "Spring Security"
    );

    // then
    // ...
}

Our final task involves waiting for the asynchronous code to finish and confirming that the consumedMessages list contains the expected content. To achieve this, we’ll employ the Awaitility library, utilizing its await().untilAsserted():

我们的最后一项任务是等待异步代码完成,并确认 consumedMessages 列表包含预期内容。为实现这一目标, 我们将使用 Awaitility,利用其 await().untilAsserted()

// then
await().untilAsserted(() -> 
  assertThat(consumedMessages).containsExactlyInAnyOrder(
    "Introduction to Kafka",
    "Kotlin for Java Developers",
    "Reactive Spring Boot",
    "Deploying Spring Boot Applications",
    "Spring Security"
  ));

4. Conclusion

4.结论

In this tutorial, we learned how to use Kafka’s Consumer and Producer API without relying on higher-level Spring modules. First, we created a consumer using CustomKafkaListener that encapsulates a KafkaConsumer. For testing, we implemented a KafkaProducer and verified our setup using Testcontainers and Awaitility.

在本教程中,我们学习了如何在不依赖更高级 Spring 模块的情况下使用 Kafka 的 ConsumerProducer API。首先,我们使用封装了 KafkaConsumer 的 CustomKafkaListener 创建了一个消费者。为了进行测试,我们实现了 KafkaProducer 并使用 Testcontainers 和 Awaitility 验证了我们的设置。

As always, the source for the examples is available over on GitHub.

一如既往,这些示例的源代码可在 GitHub 上获取。