Monitor the Consumer Lag in Apache Kafka – 监测Apache Kafka的消费者滞后情况

最后修改: 2021年 6月 27日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Kafka consumer group lag is a key performance indicator of any Kafka-based event-driven system.

Kafka消费者组滞后是任何基于Kafka的事件驱动系统的一个关键性能指标

In this tutorial, we’ll build an analyzer application to monitor Kafka consumer lag.

在本教程中,我们将建立一个分析器应用程序来监测Kafka消费者的滞后性。

2. Consumer Lag

2.消费者滞后

Consumer lag is simply the delta between the consumer’s last committed offset and the producer’s end offset in the log. In other words, the consumer lag measures the delay between producing and consuming messages in any producer-consumer system.

消费者滞后仅仅是日志中消费者最后承诺的偏移量和生产者结束的偏移量之间的delta。换句话说,消费者滞后衡量的是任何生产者-消费者系统中生产和消费消息之间的延迟。

In this section, let’s understand how we can determine the offset values.

在本节中,让我们了解如何确定偏移值。

2.1. Kafka AdminClient

2.1. Kafka AdminClient

To inspect the offset values of a consumer group, we’ll need the administrative Kafka client. So, let’s write a method in the LagAnalyzerService class to create an instance of the AdminClient class:

要检查一个消费者组的偏移值我们需要管理Kafka客户端。因此,让我们在LagAnalyzerService类中写一个方法来创建AdminClient类的实例。

private AdminClient getAdminClient(String bootstrapServerConfig) {
    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
    return AdminClient.create(config);
}

We must note the use of @Value annotation to retrieve the bootstrap server list from the property file. In the same way, we’ll use this annotation to get other values such as groupId and topicName.

我们必须注意使用@Value注解来从属性文件中检索bootstrap服务器列表。以同样的方式,我们将使用这个注解来获取其他值,如groupId和topicName

2.2. Consumer Group Offset

2.2.消费者群体抵消

First, we can use the listConsumerGroupOffsets() method of the AdminClient class to fetch the offset information of a specific consumer group id.

首先,我们可以使用AdminClient类的listConsumerGroupOffsets()方法来获取特定消费组id的偏移信息

Next, our focus is mainly on the offset values, so we can invoke the partitionsToOffsetAndMetadata() method to get a map of TopicPartition vs. OffsetAndMetadata values:

接下来,我们的重点主要是偏移值,所以我们可以调用partitionsToOffsetAndMetadata()方法来获取TopicPartition与OffsetAndMetadata值的映射。

private Map<TopicPartition, Long> getConsumerGrpOffsets(String groupId) 
  throws ExecutionException, InterruptedException {
    ListConsumerGroupOffsetsResult info = adminClient.listConsumerGroupOffsets(groupId);
    Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = info.partitionsToOffsetAndMetadata().get();

    Map<TopicPartition, Long> groupOffset = new HashMap<>();
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
        TopicPartition key = entry.getKey();
        OffsetAndMetadata metadata = entry.getValue();
        groupOffset.putIfAbsent(new TopicPartition(key.topic(), key.partition()), metadata.offset());
    }
    return groupOffset;
}

Lastly, we can notice the iteration over the topicPartitionOffsetAndMetadataMap to limit our fetched results to the offset values per each topic and partition.

最后,我们可以注意到对topicPartitionOffsetAndMetadataMap的迭代,将我们获取的结果限制在每个主题和分区的偏移值。

2.3. Producer Offset

2.3.生产者抵销

The only thing left for finding the consumer group lag is a way of getting the end offset values. For this, we can use the endOffsets() method of the KafkaConsumer class.

对于寻找消费者组的滞后性来说,唯一剩下的就是获取末端偏移值的方法。为此,我们可以使用KafkaConsumer类的endOffsets()方法。

Let’s start by creating an instance of the KafkaConsumer class in the LagAnalyzerService class:

让我们先在LagAnalyzerService类中创建一个KafkaConsumer类的实例。

private KafkaConsumer<String, String> getKafkaConsumer(String bootstrapServerConfig) {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return new KafkaConsumer<>(properties);
}

Next, let’s aggregate all the relevant TopicPartition values from the consumer group offsets for which we need to compute the lag so that we provide it as an argument to the endOffsets() method:

接下来,让我们把所有相关的TopicPartition值从我们需要计算滞后的消费者组偏移中聚合起来,以便我们把它作为一个参数提供给endOffsets()方法。

private Map<TopicPartition, Long> getProducerOffsets(Map<TopicPartition, Long> consumerGrpOffset) {
    List<TopicPartition> topicPartitions = new LinkedList<>();
    for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffset.entrySet()) {
        TopicPartition key = entry.getKey();
        topicPartitions.add(new TopicPartition(key.topic(), key.partition()));
    }
    return kafkaConsumer.endOffsets(topicPartitions);
}

Finally, let’s write a method that uses consumer offsets and producer’s endoffsets to generate the lag for each TopicPartition:

最后,让我们写一个方法,使用消费者的偏移量和生产者的endoffsets来生成每个TopicPartition的滞后。

private Map<TopicPartition, Long> computeLags(
  Map<TopicPartition, Long> consumerGrpOffsets,
  Map<TopicPartition, Long> producerOffsets) {
    Map<TopicPartition, Long> lags = new HashMap<>();
    for (Map.Entry<TopicPartition, Long> entry : consumerGrpOffsets.entrySet()) {
        Long producerOffset = producerOffsets.get(entry.getKey());
        Long consumerOffset = consumerGrpOffsets.get(entry.getKey());
        long lag = Math.abs(producerOffset - consumerOffset);
        lags.putIfAbsent(entry.getKey(), lag);
    }
    return lags;
}

3. Lag Analyzer

3.滞后分析器

Now, let’s orchestrate the lag analysis by writing the analyzeLag() method in the LagAnalyzerService class:

现在,让我们通过在LagAnalyzerService类中编写analyzeLag()方法来协调滞后分析。

public void analyzeLag(String groupId) throws ExecutionException, InterruptedException {
    Map<TopicPartition, Long> consumerGrpOffsets = getConsumerGrpOffsets(groupId);
    Map<TopicPartition, Long> producerOffsets = getProducerOffsets(consumerGrpOffsets);
    Map<TopicPartition, Long> lags = computeLags(consumerGrpOffsets, producerOffsets);
    for (Map.Entry<TopicPartition, Long> lagEntry : lags.entrySet()) {
        String topic = lagEntry.getKey().topic();
        int partition = lagEntry.getKey().partition();
        Long lag = lagEntry.getValue();
        System.out.printf("Time=%s | Lag for topic = %s, partition = %s is %d\n",
          MonitoringUtil.time(),
          topic,
          partition,
          lag);
    }
}

However, when it comes to monitoring the lag metric, we’d need an almost real-time value of the lag so that we can take any administrative action for recovering system performance.

然而,当涉及到监测滞后指标时,我们需要一个几乎实时的滞后值,以便我们能够采取任何管理行动来恢复系统性能

One straightforward way of achieving this is by polling the lag value at a regular interval of time. So, let’s create a LiveLagAnalyzerService service that will invoke the analyzeLag() method of the LagAnalyzerService:

实现这一目标的一个直接方法是通过在一个固定的时间间隔内查询滞后值。因此,让我们创建一个LiveLagAnalyzerService服务,调用LagAnalyzerService的analyzeLag()方法:

@Scheduled(fixedDelay = 5000L)
public void liveLagAnalysis() throws ExecutionException, InterruptedException {
    lagAnalyzerService.analyzeLag(groupId);
}

For our purpose, we have set the poll frequency as 5 seconds using the @Scheduled annotation. However, for real-time monitoring, we’d probably need to make this accessible via JMX.

为了我们的目的,我们使用@Scheduled注解将轮询频率设置为5秒。然而,对于实时监控,我们可能需要通过JMX来实现。

4. Simulation

4.仿真

In this section, we’ll simulate Kafka producer and consumer for a local Kafka setup so that we can see LagAnalyzer in action without depending on an external Kafka producer and consumer.

在本节中,我们将本地Kafka设置模拟Kafka生产者和消费者,这样我们就可以看到LagAnalyzer的运行情况,而无需依赖外部Kafka生产者和消费者。

4.1. Simulation Mode

4.1.仿真模式

Since simulation mode is required only for demonstration purposes, we should have a mechanism to turn it off when we want to run the Lag Analyzer application for a real scenario.

由于模拟模式只需要用于演示目的,我们应该有一个机制,当我们想为真实场景运行Lag Analyzer应用程序时,可以将其关闭。

We can keep this as a configurable property in the application.properties resource file:

我们可以在application.properties资源文件中把它作为一个可配置的属性。

monitor.producer.simulate=true
monitor.consumer.simulate=true

We’ll plug these properties into the Kafka producer and consumer and control their behavior.

我们将把这些属性插入Kafka生产者和消费者中,并控制它们的行为。

Additionally, let’s define producer startTime, endTime, and a helper method time() to get the current time during the monitoring:

此外,让我们定义生产者startTime, endTime,和一个辅助方法time() 来获取监控期间的当前时间。

public static final Date startTime = new Date();
public static final Date endTime = new Date(startTime.getTime() + 30 * 1000);

public static String time() {
    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
    LocalDateTime now = LocalDateTime.now();
    String date = dtf.format(now);
    return date;
}

4.2. Producer-Consumer Configurations

4.2.生产者-消费者配置

We’ll need to define few core configuration values for instantiating the instances for our Kafka consumer and producer simulators.

我们需要定义一些核心配置值,以便为我们的Kafka消费者和生产者模拟器实例化。

First, let’s define the configuration for the consumer simulator in the KafkaConsumerConfig class:

首先,让我们在KafkaConsumerConfig类中定义消费者模拟器的配置。

public ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    if (enabled) {
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    } else {
        props.put(ConsumerConfig.GROUP_ID_CONFIG, simulateGroupId);
    }
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    if (enabled) {
        factory.setConsumerFactory(consumerFactory(groupId));
    } else {
        factory.setConsumerFactory(consumerFactory(simulateGroupId));
    }
    return factory;
}

Next, we can define the configuration for the producer simulator in the KafkaProducerConfig class:

接下来,我们可以在KafkaProducerConfig类中定义生产者模拟器的配置。

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

Further, let’s use the @KafkaListener annotation to specify the target listener, which is, of course, enabled only when monitor.consumer.simulate is set to true:

此外,让我们使用@KafkaListenerannotation来指定目标监听器,当然,只有当monitor.consumer.simulate被设置为true时才会启用。

@KafkaListener(
  topics = "${monitor.topic.name}",
  containerFactory = "kafkaListenerContainerFactory",
  autoStartup = "${monitor.consumer.simulate}")
public void listen(String message) throws InterruptedException {
    Thread.sleep(10L);
}

As such, we added a sleeping time of 10 milliseconds to create an artificial consumer lag.

因此,我们增加了一个10毫秒的睡眠时间,以创造一个人为的消费滞后。

Finally, let’s write a sendMessage() method to simulate the producer:

最后,让我们写一个sendMessage()方法来模拟生产商

@Scheduled(fixedDelay = 1L, initialDelay = 5L)
public void sendMessage() throws ExecutionException, InterruptedException {
    if (enabled) {
        if (endTime.after(new Date())) {
            String message = "msg-" + time();
            SendResult<String, String> result = kafkaTemplate.send(topicName, message).get();
        }
    }
}

We can notice that the producer will generate messages at the rate of 1 message/ms. Moreover, it’ll stop producing messages after the endTime of 30 seconds after startTime of the simulation.

我们可以注意到,生产者将以1个消息/ms的速度产生消息。此外,它将在模拟开始时间30秒后停止产生消息。

4.3. Live Monitoring

4.3.实时监控

Now, let’s run the main method in our LagAnalyzerApplication:

现在,让我们在我们的LagAnalyzerApplication中运行主方法:

public static void main(String[] args) {
    SpringApplication.run(LagAnalyzerApplication.class, args);
    while (true) ;
}

We’ll see the current lag on each partition of the topic after every 30 seconds:

我们会在每隔30秒后看到主题的每个分区的当前滞后情况。

Time=2021/06/06 11:07:24 | Lag for topic = baeldungTopic, partition = 0 is 93
Time=2021/06/06 11:07:29 | Lag for topic = baeldungTopic, partition = 0 is 290
Time=2021/06/06 11:07:34 | Lag for topic = baeldungTopic, partition = 0 is 776
Time=2021/06/06 11:07:39 | Lag for topic = baeldungTopic, partition = 0 is 1159
Time=2021/06/06 11:07:44 | Lag for topic = baeldungTopic, partition = 0 is 1559
Time=2021/06/06 11:07:49 | Lag for topic = baeldungTopic, partition = 0 is 2015
Time=2021/06/06 11:07:54 | Lag for topic = baeldungTopic, partition = 0 is 1231
Time=2021/06/06 11:07:59 | Lag for topic = baeldungTopic, partition = 0 is 731
Time=2021/06/06 11:08:04 | Lag for topic = baeldungTopic, partition = 0 is 231
Time=2021/06/06 11:08:09 | Lag for topic = baeldungTopic, partition = 0 is 0

As such, the rate at which the producer is producing messages is 1 message/ms, which is higher than the rate at which the consumer is consuming the message. So, lag will start building for the first 30 seconds, after which the producer stops producing, so lag will gradually decline to 0.

因此,生产者生产消息的速度是1个消息/ms,这比消费者消费消息的速度要高。因此,在最初的30秒内,lag会开始建立,之后生产者停止生产,所以lag会逐渐下降到0

5. Conclusion

5.总结

In this tutorial, we developed an understanding of how to find the consumer lag on a Kafka topic. Additionally, we used that knowledge to create a LagAnalyzer application in spring that shows the lag in almost real-time.

在本教程中,我们了解了如何在Kafka主题上找到消费者的滞后情况。此外,我们用这些知识在spring中创建了一个LagAnalyzer应用程序,几乎实时显示滞后情况

As always, the complete source code for the tutorial is available over on GitHub.

一如既往,该教程的完整源代码可在GitHub上获得over