1. Overview
1.概述
In this article, we’ll explore a few strategies to purge data from an Apache Kafka topic.
在这篇文章中,我们将探讨从Apache Kafka主题中清除数据的几个策略。
2. Clean-Up Scenario
2.清理方案
Before we learn the strategies to clean up the data, let’s acquaint ourselves with a simple scenario that demands a purging activity.
在我们学习清理数据的策略之前,让我们先熟悉一个需要清理活动的简单场景。
2.1. Scenario
2.1. 情景
Messages in Apache Kafka automatically expire after a configured retention time. Nonetheless, in a few cases, we might want the message deletion to happen immediately.
Apache Kafka中的消息在配置的保留时间后自动过期。然而,在少数情况下,我们可能希望消息的删除立即发生。
Let’s imagine that a defect has been introduced in the application code that is producing messages in a Kafka topic. By the time a bug-fix is integrated, we already have many corrupt messages in the Kafka topic that are ready for consumption.
让我们想象一下,在Kafka主题中产生消息的应用程序代码中引入了一个缺陷。当错误修复被整合时,我们已经在Kafka主题中有许多损坏的消息,这些消息已经可以被消费。
Such issues are most common in a development environment, and we want quick results. So, bulk deletion of messages is a rational thing to do.
这样的问题在开发环境中是最常见的,而且我们希望快速得到结果。所以,批量删除信息是一个理性的做法。
2.2. Simulation
2.2.仿真
To simulate the scenario, let’s start by creating a purge-scenario topic from the Kafka installation directory:
为了模拟这个场景,我们先从Kafka安装目录中创建一个purge-scenariotopic。
$ bin/kafka-topics.sh \
--create --topic purge-scenario --if-not-exists \
--partitions 2 --replication-factor 1 \
--zookeeper localhost:2181
Next, let’s use the shuf command to generate random data and feed it to the kafka-console-producer.sh script:
接下来,让我们使用shuf命令来生成随机数据并将其输入kafka-console-producer.sh脚本。
$ /usr/bin/shuf -i 1-100000 -n 50000000 \
| tee -a /tmp/kafka-random-data \
| bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario
We must note that we’ve used the tee command to save the simulation data for later use.
我们必须注意,我们已经使用了tee命令来保存模拟数据供以后使用。
Finally, let’s verify that a consumer can consume messages from the topic:
最后,让我们验证一下,消费者可以从主题中消费消息。
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 3
76696
49425
1744
Processed a total of 3 messages
3. Message Expiry
信息有效期
The messages produced in the purge-scenario topic will have a default retention period of seven days. To purge messages, we can temporarily reset the retention.ms topic-level property to ten seconds and wait for messages to expire:
在purge-scenariotopic中产生的消息将有一个默认的保留期为7天。为了清除消息,我们可以暂时将retention.ms主题级属性重置为10秒,然后等待消息过期。
$ bin/kafka-configs.sh --alter \
--add-config retention.ms=10000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario \
&& sleep 10
Next, let’s verify that the messages have expired from the topic:
接下来,让我们验证一下信息是否已经从主题中过期。
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
Finally, we can restore the original retention period of seven days for the topic:
最后,我们可以恢复该主题的原始保留期,即7天。
$ bin/kafka-configs.sh --alter \
--add-config retention.ms=604800000 \
--bootstrap-server=0.0.0.0:9092 \
--topic purge-scenario
With this approach, Kafka will purge messages across all the partitions for the purge-scenario topic.
使用这种方法,Kafka将清除purge-scenario主题的所有分区中的消息。
4. Selective Record Deletion
4.选择性删除记录
At times, we might want to delete records selectively within one or more partitions from a specific topic. We can satisfy such requirements by using the kafka-delete-records.sh script.
有时,我们可能想从一个或多个分区中选择性地删除特定主题的记录。我们可以通过使用kafka-delete-records.sh脚本来满足这种需求。
First, we need to specify the partition-level offset in the delete-config.json configuration file.
首先,我们需要在delete-config.json配置文件中指定分区级偏移。
Let’s purge all messages from the partition=1 by using offset=-1:
让我们通过使用offset=-1来清除partition=1中的所有信息。
{
"partitions": [
{
"topic": "purge-scenario",
"partition": 1,
"offset": -1
}
],
"version": 1
}
Next, let’s proceed with record deletion:
接下来,让我们继续进行记录的删除。
$ bin/kafka-delete-records.sh \
--bootstrap-server localhost:9092 \
--offset-json-file delete-config.json
We can verify that we’re still able to read from partition=0:
我们可以验证,我们仍然能够从partition=0读取。
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario --partition=0 \
--max-messages 1 --timeout-ms 1000
44017
Processed a total of 1 messages
However, when we read from partition=1, there will be no records to process:
然而,当我们从partition=1读取时,将没有记录需要处理。
$ bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning --topic purge-scenario \
--partition=1 \
--max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
5. Delete and Recreate the Topic
5.删除并重新创建该主题
Another workaround to purge all messages of a Kafka topic is to delete and recreate it. However, this is only possible if we set the delete.topic.enable property to true while starting the Kafka server:
另一个清除Kafka主题的所有消息的变通方法是删除并重新创建它。然而,这只有在我们将delete.topic.enable属性设置为true 同时启动Kafka服务器时才有可能。
$ bin/kafka-server-start.sh config/server.properties \
--override delete.topic.enable=true
To delete the topic, we can use the kafka-topics.sh script:
要删除这个话题,我们可以使用kafka-topics.sh脚本。
$ bin/kafka-topics.sh \
--delete --topic purge-scenario \
--zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Let’s verify it by listing the topic:
让我们通过列举主题来验证它。
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
After confirming that the topic is no longer listed, we can now go ahead and recreate it.
在确认该主题不再被列出后,我们现在可以去重新创建它。
6. Conclusion
6.结语
In this tutorial, we simulated a scenario where we’d need to purge an Apache Kafka topic. Moreover, we explored multiple strategies to purge it completely or selectively across partitions.
在本教程中,我们模拟了一个需要清除Apache Kafka主题的场景。此外,我们探索了多种策略来完全或有选择地跨分区清除它。