Get the Number of Messages in an Apache Kafka Topic – 获取Apache Kafka主题中的消息数

最后修改: 2022年 8月 8日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Apache Kafka is an open-source distributed event streaming platform.

Apache Kafka是一个开源的分布式事件流平台。

In this quick tutorial, we’ll learn techniques for getting the number of messages in a Kafka topic. We’ll demonstrate programmatic as well as native commands techniques.

在这个快速教程中,我们将学习获取Kafka主题中消息数量的技术。我们将演示程序化以及本地命令技术。

2. Programmatic Technique

2.方案技术

A Kafka topic may have multiple partitions. Our technique should make sure we’ve counted the number of messages from every partition.

一个Kafka主题可能有多个分区。我们的技术应该确保我们已经计算了每个分区的消息数量。

We’ve to go through each partition and check their latest offset. For this, we’ll introduce a consumer:

我们必须通过每个分区并检查它们的最新偏移量。为此,我们将引入一个消费者。

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

The second step is to get all the partitions from this consumer:

第二步是从这个消费者那里获得所有分区

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream().map(p -> new TopicPartition(topic, p.partition()))
    .collect(Collectors.toList());

The third step is to offset the consumer at the end of each partition and record the result in a partition map:

第三步是在每个分区的末端对消费者进行偏移,并将结果记录在分区图中

consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream().collect(Collectors.toMap(Function.identity(), consumer::position));

The final step is to take the last positions in each partition and sum the result to get the number of messages in the topic:

最后一步是取每个分区的最后一个位置,并将结果相加,得到该主题的信息数量:

numberOfMessages = partitions.stream().mapToLong(p -> endPartitions.get(p)).sum();

3. Kafka Native Commands

3.Kafka本地命令

Programmatic techniques are good to have in case we want to perform some automated tasks on the number of messages on a Kafka topic. However, if it’s only for analysis purposes, it’ll be an overhead to create these services and run them on a machine. A straightforward option would be to make use of native Kafka commands. It’ll give quick results.

如果我们想对Kafka主题上的消息数量执行一些自动化任务,那么程序化技术是很好的选择。然而,如果只是为了分析,创建这些服务并在机器上运行将是一种开销。一个直接的选择是利用本地Kafka命令。这将带来快速的结果。

3.1. Using GetoffsetShell Command

3.1.使用GetoffsetShell命令

Before executing native commands, we’ve to navigate to Kafka’s root folder on the machine. The following command returns us the number of messages being published on the topic baeldung:

在执行本地命令之前,我们要导航到机器上的Kafka的根文件夹。下面的命令为我们返回在主题baeldung上正在发布的消息数量。

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell   --broker-list localhost:9092   
--topic baeldung   | awk -F  ":" '{sum += $3} END {print "Result: "sum}'
Result: 3

3.2. Using Consumer Console

3.2.使用消费者控制台

As discussed earlier, we’ll be navigating to Kafka’s root folder before any executing commands. The following command returns the number of messages being published on the topic baeldung:

正如前面所讨论的,我们将在任何执行命令之前导航到Kafka的根文件夹。下面的命令返回正在发布在主题baeldung上的消息数量。

$ bin/kafka-console-consumer.sh  --from-beginning  --bootstrap-server localhost:9092 
--property print.key=true --property print.value=false --property print.partition 
--topic baeldung --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
Processed a total of 3 messages

4. Conclusion

4.总结

In this article, we’ve looked into techniques to get the number of messages in a Kafka topic. We learned a programmatic technique that assigns all partitions to a consumer and checks the latest offset.

在这篇文章中,我们已经研究了获得Kafka主题中的消息数量的技术。我们学习了一种程序化的技术,将所有分区分配给消费者,并检查最新的偏移量。

We also saw two native Kafka commands techniques. One was the GetoffsetShell command from Kafka tools. The other one was running a consumer on the console and printing the number of messages from the beginning.

我们还看到了两个本地的Kafka命令技术。一个是Kafka工具中的GetoffsetShell命令。另一个是在控制台运行一个消费者,并从头开始打印消息的数量。

As always, the source code of this article can be found over on GitHub.

一如既往,本文的源代码可以在GitHub上找到over