Read Multiple Messages with Apache Kafka – 使用 Apache Kafka 读取多个报文

最后修改: 2024年 1月 7日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll explore how the Kafka Consumer retrieves messages from the broker. We’ll learn the configurable properties that can directly impact how many messages the Kafka Consumer reads at once. Finally, we’ll explore how adjusting these settings affects the Consumer‘s behavior.

在本教程中,我们将探讨 Kafka Consumer 如何从代理中检索消息。我们将学习可直接影响 Kafka 消费者一次读取多少消息的可配置属性。最后,我们将探讨调整这些设置会如何影响 Consumer 的行为。

2. Setting up the Environment

2.设置环境

Kafka Consumers are fetching records for a given partition in batches of configurable sizes. We cannot configure the exact number of records to be fetched in one batch, but we can configure the size of these batches, measured in bytes.

Kafka 消费者以可配置大小的批次获取给定分区的记录。我们无法配置在一个批次中获取记录的确切数量,但可以配置这些批次的大小(以字节为单位)。

For the code snippets in this article, we’ll need a simple Spring application that uses the kafka-clients library to interact with the Kafka broker. We’ll create a Java class that internally uses a KafkaConsumer to subscribe to a topic and log the incoming messages. If you want to dive deeper, feel free to read through our article dedicated to the Kafka Consumer API and follow along.

对于本文中的代码片段,我们需要一个使用 kafka-clients 库与 Kafka 代理交互的简单 Spring 应用程序。我们将创建一个 Java 类,该类内部使用 KafkaConsumer 来订阅主题并记录传入的消息。如果您想深入了解,请阅读我们专门介绍 Kafka Consumer API 的文章,然后继续阅读。

One of the key differences in our example will be the logging: Instead of logging one message at a time, let’s collect them and log the whole batch. This way, we’ll be able to see exactly how many messages are fetched for each poll(). Additionally, let’s enrich the log by incorporating details like the initial and final offsets of the batch along with the consumer’s groupId:

在我们的示例中,一个关键的不同之处在于日志记录:我们不再一次记录一条消息,而是收集这些消息并记录整批消息。这样,我们就能清楚地看到每次 poll() 抓取了多少条消息。此外,我们还可以将批次的初始和最终偏移量以及消费者的 groupId: 等详细信息纳入日志,从而丰富日志内容。

class VariableFetchSizeKafkaListener implements Runnable {
    private final String topic;
    private final KafkaConsumer<String, String> consumer;
    
    // constructor

    @Override
    public void run() {
        consumer.subscribe(singletonList(topic));
        int pollCount = 1;

        while (true) {
            List<ConsumerRecord<String, String>> records = new ArrayList<>();
            for (var record : consumer.poll(ofMillis(500))) {
                records.add(record);
            }
            if (!records.isEmpty()) {
                String batchOffsets = String.format("%s -> %s", records.get(0).offset(), records.get(records.size() - 1).offset());
                String groupId = consumer.groupMetadata().groupId();
                log.info("groupId: {}, poll: #{}, fetched: #{} records, offsets: {}", groupId, pollCount++, records.size(), batchOffsets);
            }
        }
    }
}

The Testcontainers library will help us set up the test environment by spinning up a Docker container with a running Kafka broker. If you want to learn more about setting up the Testcontainer’s Kafka module, check out how we configured the test environment here and follow along.

测试容器(Testcontainers)库将通过启动一个带有运行中的 Kafka 代理的 Docker 容器来帮助我们设置测试环境。如果您想了解有关设置 Testcontainer 的 Kafka 模块的更多信息,请查看我们是如何在此处配置测试环境的,然后继续学习。

In our particular case, we can define an additional method that publishes several messages on a given topic. For instance, let’s assume we are streaming values read by a temperature sensor to a topic named “engine.sensor.temperature“:

在我们的特殊情况下,我们可以定义一个额外的方法,在给定的主题上发布多条消息。例如,假设我们正在将温度传感器读取的值流式传输到名为”engine.sensor.temperature“的主题:

void publishTestData(int recordsCount, String topic) {
    List<ProducerRecord<String, String>> records = IntStream.range(0, recordsCount)
      .mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F"))
      .collect(toList());
    // publish all to kafka
}

As we can see, we have used the same key for all the messages. As a result, all records will be sent to the same partition. For payload, we’ve used a short, fixed text depicting a temperature measurement.

我们可以看到,所有信息都使用了相同的密钥。因此,所有记录都将发送到同一个分区。对于有效载荷,我们使用了一个简短、固定的文本来描述温度测量值。

3. Testing the Default Behaviour

3.测试默认行为

Let’s start by creating a Kafka listener using the default consumer configuration. Then, we’ll publish a few messages to see how many batches our listener consumes. As we can see, our custom listener uses the Consumer API internally. As a result, to instantiate VariableFetchSizeKafkaListener, we’ll have to configure and create a KafkaConsumer first:

首先,让我们使用默认的消费者配置创建一个 Kafka 监听器。然后,我们将发布一些消息,看看我们的监听器消耗了多少批次。正如我们所见,我们的自定义监听器在内部使用了消费者 API。因此,要实例化 VariableFetchSizeKafkaListener,我们必须首先配置和创建 KafkaConsumer

Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

For now, we’ll use KafkaConsumer‘s default values for the minimum and maximum fetch sizes. Based on this consumer, we can instantiate our listener and run it asynchronously to avoid blocking the main thread:

目前,我们将使用 KafkaConsumer 的默认值作为最小和最大获取大小。基于该消费者,我们可以实例化监听器并异步运行,以避免阻塞主线程:

CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
);

Finally, let’s block the test thread for a few seconds, giving some time for the listener to consume the messages. The goal of this article is to start the listeners and observe how they perform. We’ll use the Junit5 tests as a convenient way of setting up and exploring their behavior, but for simplicity, we won’t include any specific assertions. As a result, this will be our starting @Test:

最后,让我们阻塞测试线程几秒钟,给监听器一些时间来接收消息。本文的目的是启动监听器并观察它们的运行情况。我们将使用 Junit5 测试作为设置和探索其行为的便捷方法,但为了简单起见,我们不会包含任何特定的断言。因此,这将是我们的起始@Test:

@Test
void whenUsingDefaultConfiguration_thenProcessInBatchesOf() throws Exception {
    String topic = "engine.sensors.temperature";
    publishTestData(300, topic);

    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

    CompletableFuture.runAsync(
      new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
    );

    Thread.sleep(5_000L);
}

Now, let’s run the test and inspect the logs to see how many records will be fetched in a single batch:

现在,让我们运行测试并检查日志,看看一次批处理将获取多少条记录:

10:48:46.958 [ForkJoinPool.commonPool-worker-2] INFO  c.b.k.c.VariableFetchSizeKafkaListener - groupId: default_config, poll: #1, fetched: #300 records, offsets: 0 -> 299

As we can notice, we fetched all the 300 records in a single batch because our messages are small. Both the key and body are short strings: the key is four characters, and the body is 16 characters long. That’s a total of 20 bytes, plus some extra for the record’s metadata. On the other hand, the default value for the maximum batch size is one mebibyte (1.024 x 1.024 bytes), or simply 1,048,576 bytes.

我们可以看到,由于信息量较小,我们一次性获取了所有 300 条记录。密钥和正文都是短字符串:密钥长度为 4 个字符,正文长度为 16 个字符。总共 20 个字节,外加一些记录元数据。另一方面,最大批处理大小的默认值是 1 mebibyte(1.024 x 1.024 字节),即 1,048,576 字节。

4. Configuring Maximum Partition Fetch Size

4.配置最大分区取值大小

The “max.partition.fetch.bytes” in Kafka determines the largest amount of data that a consumer can fetch from a single partition in a single request. Consequently, even for a small number of short messages, we can force our listeners to fetch the records in multiple batches by changing the property.

Kafka 中的 “max.partition.fetch.bytes”决定了消费者在单次请求中可从单个分区获取的最大数据量。因此,即使是少量的短消息,我们也可以通过更改该属性来强制监听器分多个批次获取记录。

To observe this, let’s create two moreVariableFetchSizeKafkaListeners and configure them setting this property to only 500B, respectively 5KB. Firstly, let’s extract all the common consumer Properties in a dedicated  method to avoid code duplication:

为了观察这一点,让我们再创建两个VariableFetchSizeKafkaListener,并将它们分别配置为仅 500B 和 5KB。首先,让我们在一个专用方法中提取所有常用的消费者 Properties 以避免代码重复:

Properties commonConsumerProperties() {
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return props;
}

Then, let’s create the first listener and run it asynchronously:

然后,让我们创建第一个监听器并异步运行它:

Properties fetchSize_500B = commonConsumerProperties();
fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_500B))
);

As we can see, we are setting different consumer group IDs for the various listeners. This will allow them to consume the same test data. Now, let’s proceed with the second listener and complete the test:

我们可以看到,我们为不同的监听器设置了不同的消费者组 ID。这将允许它们使用相同的测试数据。现在,让我们继续使用第二个监听器并完成测试:

@Test
void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception {
    String topic = "engine.sensors.temperature";
    publishTestData(300, topic);
    
    Properties fetchSize_500B = commonConsumerProperties();
    fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
    fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
    CompletableFuture.runAsync(
      new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_500B))
    );

    Properties fetchSize_5KB = commonConsumerProperties();
    fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB");
    fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5000");
    CompletableFuture.runAsync(
      new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_5KB))
    );

    Thread.sleep(10_000L);
}

If we run this test, we can assume that the first consumer will fetch batches roughly ten times smaller than the second consumer. Let’s analyze the logs:

如果运行这个测试,我们可以假设第一个消费者获取的批次大约是第二个消费者的十倍。让我们来分析一下日志:

[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #1, fetched: #56 records, offsets: 0 -> 55
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #1, fetched: #5 records, offsets: 0 -> 4
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #2, fetched: #5 records, offsets: 5 -> 9
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #2, fetched: #56 records, offsets: 56 -> 111
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -> 14
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #3, fetched: #56 records, offsets: 112 -> 167
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -> 19
[worker-3] INFO - groupId: max_fetch_size_5KB, poll: #4, fetched: #51 records, offsets: 168 -> 218
[worker-2] INFO - groupId: max_fetch_size_500B, poll: #5, fetched: #5 records, offsets: 20 -> 24
[...]

As expected, one of the listeners fetches, indeed, batches of data that are almost ten times larger than the other. Moreover, it’s important to understand that the number of records within a batch depends on the size of these records and their metadata. To highlight this, we can observe that the consumer with groupId  “max_fetch_size_5KB” fetched fewer records when polling the fourth time.

不出所料,其中一个监听器获取的数据批次确实比另一个大了近十倍。此外,重要的是要了解批次中的记录数取决于这些记录及其元数据的大小。为了强调这一点,我们可以观察到,当第四次轮询时,groupIdmax_fetch_size_5KB“的消费者获取的记录数较少。

5. Configuring Minimum Fetch Size

5.配置最小读取大小

The Consumer API also allows customizing the minimum fetch size through the “fetch.min.bytes” property. We can change this property to specify the minimum amount of data that a broker needs to respond. If this minimum value is not met, the broker waits longer before sending a response to the consumer’s fetch request. To emphasize this, we can add a delay to our test publisher within our test helper method. As a result, the producer will wait a specific number of milliseconds between sending each message:

消费者 API 还允许通过“fetch.min.bytes” 属性自定义最小获取大小。我们可以更改该属性来指定代理需要响应的最小数据量。如果未满足该最小值,代理在向消费者的获取请求发送响应之前将等待更长的时间。为了强调这一点,我们可以在测试辅助方法中为测试发布者添加延迟。因此,生产者将在发送每条消息之间等待特定的毫秒数:

@Test
void whenChangingMinFetchBytesProperty_thenAdjustWaitTimeWhilePolling() throws Exception {
    String topic = "engine.sensors.temperature";
    publishTestData(300, topic, 100L);  
    // ...
}

void publishTestData(int measurementsCount, String topic, long delayInMillis) {
    // ...
}

Let’s start by creating a VariableFetchSizeKafkaListener that will use the default configuration, having “fetch.min.bytes” equal to one byte. Similar to the previous examples, we’ll run this consumer asynchronously within a CompletableFuture:

让我们从创建 VariableFetchSizeKafkaListener 开始,它将使用默认配置,”fetch.min.bytes“等于一个字节。与之前的示例类似,我们将在 CompletableFuture 中异步运行此消费者:

// fetch.min.bytes = 1 byte (default)
Properties minFetchSize_1B = commonConsumerProperties();
minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B");
CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_1B))
);

With this setup, and due to the delay we introduced, we can expect each record to be retrieved individually, one after the other. In other words, we can expect many batches of a single record. Also, we expect these batches to be consumed at a similar speed as our KafkaProducer publishes the data, which in our case is every 100 milliseconds. Let’s run the test and analyze the logs:

通过这种设置,再加上我们引入的延迟,我们可以预期每条记录都将被逐一检索。换句话说,我们可以预期一条记录会有很多批次。此外,我们希望这些批次的消耗速度与 KafkaProducer 发布数据的速度相近,在我们的案例中,速度为每 100 毫秒一次。让我们运行测试并分析日志:

14:23:22.368 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #1, fetched: #1 records, offsets: 0 -> 0
14:23:22.472 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #2, fetched: #1 records, offsets: 1 -> 1
14:23:22.582 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #3, fetched: #1 records, offsets: 2 -> 2
14:23:22.689 [worker-2] INFO - groupId: min_fetch_size_1B, poll: #4, fetched: #1 records, offsets: 3 -> 3
[...]

Moreover, we can force the consumer to wait for more data to accumulate by adjusting the “fetch.min.bytes” value to a larger size:

此外,我们还可以将”fetch.min.bytes“值调整到更大,从而迫使消费者等待更多数据的积累:

// fetch.min.bytes = 500 bytes
Properties minFetchSize_500B = commonConsumerProperties();
minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B");
minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500");
CompletableFuture.runAsync(
  new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_500B))
);

With the property set to 500 bytes, we can anticipate the consumer to wait longer and fetch more data. Let’s run this example as well and observe the outcomes:

当属性设置为 500 字节时,我们可以预计消费者会等待更长的时间并获取更多数据。让我们也运行这个示例,观察一下结果:

14:24:49.303 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #1, fetched: #6 records, offsets: 0 -> 5
14:24:49.812 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #2, fetched: #4 records, offsets: 6 -> 9
14:24:50.315 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #3, fetched: #5 records, offsets: 10 -> 14
14:24:50.819 [worker-3] INFO - groupId: mim_fetch_size_500B, poll: #4, fetched: #5 records, offsets: 15 -> 19
[...]

6. Conclusion

6.结论

In this article, we discussed the way KafkaConsumers are fetching data from the broker. We learned that, by default, the consumer will fetch data if there is at least one new record. On the other hand, if the new data from the partition exceeds 1,048,576 bytes, it will be split into multiple batches of that maximum size. We discovered that customizing the “fetch.min.bytes” and “max.partition.fetch.bytes” properties allows us to tailor Kafka’s behavior to suit our specific requirements.

在本文中,我们讨论了 KafkaConsumers 从代理获取数据的方式。我们了解到,默认情况下,如果至少有一条新记录,消费者就会获取数据。另一方面,如果来自分区的新数据超过 1,048,576 字节,它将被分成多个最大大小为 1,048,576 字节的批次。我们发现,自定义”fetch.min.bytes“和”max.partition.fetch.bytes“属性可以让我们定制 Kafka 的行为,以满足我们的特定需求。

As always, the source for the examples is available over on GitHub.

一如既往,这些示例的源代码可在 GitHub 上获取。