Guide to Purging an Apache Kafka Topic – 清理Apache Kafka主题的指南

最后修改: 2021年 3月 6日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we’ll explore a few strategies to purge data from an Apache Kafka topic.

在这篇文章中,我们将探讨从Apache Kafka主题中清除数据的几个策略

2. Clean-Up Scenario

2.清理方案

Before we learn the strategies to clean up the data, let’s acquaint ourselves with a simple scenario that demands a purging activity.

在我们学习清理数据的策略之前,让我们先熟悉一个需要清理活动的简单场景。

2.1. Scenario

2.1. 情景

Messages in Apache Kafka automatically expire after a configured retention time. Nonetheless, in a few cases, we might want the message deletion to happen immediately.

Apache Kafka中的消息在配置的保留时间后自动过期。然而,在少数情况下,我们可能希望消息的删除立即发生。

Let’s imagine that a defect has been introduced in the application code that is producing messages in a Kafka topic. By the time a bug-fix is integrated, we already have many corrupt messages in the Kafka topic that are ready for consumption.

让我们想象一下,在Kafka主题中产生消息的应用程序代码中引入了一个缺陷。当错误修复被整合时,我们已经在Kafka主题中有许多损坏的消息,这些消息已经可以被消费。

Such issues are most common in a development environment, and we want quick results. So, bulk deletion of messages is a rational thing to do.

这样的问题在开发环境中是最常见的,而且我们希望快速得到结果。所以,批量删除信息是一个理性的做法。

2.2. Simulation

2.2.仿真

To simulate the scenario, let’s start by creating a purge-scenario topic from the Kafka installation directory:

为了模拟这个场景,我们先从Kafka安装目录中创建一个purge-scenariotopic。

$ bin/kafka-topics.sh \
  --create --topic purge-scenario --if-not-exists \
  --partitions 2 --replication-factor 1 \
  --zookeeper localhost:2181

Next, let’s use the shuf command to generate random data and feed it to the kafka-console-producer.sh script:

接下来,让我们使用shuf命令来生成随机数据并将其输入kafka-console-producer.sh脚本。

$ /usr/bin/shuf -i 1-100000 -n 50000000 \
  | tee -a /tmp/kafka-random-data \
  | bin/kafka-console-producer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario

We must note that we’ve used the tee command to save the simulation data for later use.

我们必须注意,我们已经使用了tee命令来保存模拟数据供以后使用。

Finally, let’s verify that a consumer can consume messages from the topic:

最后,让我们验证一下,消费者可以从主题中消费消息。

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --max-messages 3
76696
49425
1744
Processed a total of 3 messages

3. Message Expiry

信息有效期

The messages produced in the purge-scenario topic will have a default retention period of seven days. To purge messages, we can temporarily reset the retention.ms topic-level property to ten seconds and wait for messages to expire:

purge-scenariotopic中产生的消息将有一个默认的保留期为7天。为了清除消息,我们可以暂时将retention.ms主题级属性重置为10秒,然后等待消息过期。

$ bin/kafka-configs.sh --alter \
  --add-config retention.ms=10000 \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario \
  && sleep 10

Next, let’s verify that the messages have expired from the topic:

接下来,让我们验证一下信息是否已经从主题中过期。

$ bin/kafka-console-consumer.sh  \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

Finally, we can restore the original retention period of seven days for the topic:

最后,我们可以恢复该主题的原始保留期,即7天。

$ bin/kafka-configs.sh --alter \
  --add-config retention.ms=604800000 \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario

With this approach, Kafka will purge messages across all the partitions for the purge-scenario topic.

使用这种方法,Kafka将清除purge-scenario主题的所有分区中的消息。

4. Selective Record Deletion

4.选择性删除记录

At times, we might want to delete records selectively within one or more partitions from a specific topic. We can satisfy such requirements by using the kafka-delete-records.sh script.

有时,我们可能想从一个或多个分区中选择性地删除特定主题的记录。我们可以通过使用kafka-delete-records.sh脚本来满足这种需求。

First, we need to specify the partition-level offset in the delete-config.json configuration file.

首先,我们需要在delete-config.json配置文件中指定分区级偏移。

Let’s purge all messages from the partition=1 by using offset=-1:

让我们通过使用offset=-1来清除partition=1中的所有信息。

{
  "partitions": [
    {
      "topic": "purge-scenario",
      "partition": 1,
      "offset": -1
    }
  ],
  "version": 1
}

Next, let’s proceed with record deletion:

接下来,让我们继续进行记录的删除。

$ bin/kafka-delete-records.sh \
  --bootstrap-server localhost:9092 \
  --offset-json-file delete-config.json

We can verify that we’re still able to read from partition=0:

我们可以验证,我们仍然能够从partition=0读取。

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario --partition=0 \
  --max-messages 1 --timeout-ms 1000
  44017
  Processed a total of 1 messages

However, when we read from partition=1, there will be no records to process:

然而,当我们从partition=1读取时,将没有记录需要处理。

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --partition=1 \
  --max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

5. Delete and Recreate the Topic

5.删除并重新创建该主题

Another workaround to purge all messages of a Kafka topic is to delete and recreate it. However, this is only possible if we set the delete.topic.enable property to true while starting the Kafka server:

另一个清除Kafka主题的所有消息的变通方法是删除并重新创建它。然而,这只有在我们将delete.topic.enable属性设置为true 同时启动Kafka服务器时才有可能

$ bin/kafka-server-start.sh config/server.properties \
  --override delete.topic.enable=true

To delete the topic, we can use the kafka-topics.sh script:

要删除这个话题,我们可以使用kafka-topics.sh脚本。

$ bin/kafka-topics.sh \
  --delete --topic purge-scenario \
  --zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

Let’s verify it by listing the topic:

让我们通过列举主题来验证它。

$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

After confirming that the topic is no longer listed, we can now go ahead and recreate it.

在确认该主题不再被列出后,我们现在可以去重新创建它。

6. Conclusion

6.结语

In this tutorial, we simulated a scenario where we’d need to purge an Apache Kafka topic. Moreover, we explored multiple strategies to purge it completely or selectively across partitions.

在本教程中,我们模拟了一个需要清除Apache Kafka主题的场景。此外,我们探索了多种策略来完全或有选择地跨分区清除它