1. Overview
1.概述
When a producer sends a message to Apache Kafka, it appends it in a log file and retains it for a configured duration.
当生产者向Apache Kafka发送消息时,它会将其附加到一个日志文件中,并在配置的时间内保留它。
In this tutorial, we’ll learn to configure time-based message retention properties for Kafka topics.
在本教程中,我们将学习为Kafka主题配置基于时间的消息保留属性。
2. Time-Based Retention
2.基于时间的保留
With retention period properties in place, messages have a TTL (time to live). Upon expiry, messages are marked for deletion, thereby freeing up the disk space.
在保留期属性到位的情况下,消息有一个TTL(生存时间)。到期后,消息被标记为删除,从而释放了磁盘空间。
The same retention period property applies to all messages within a given Kafka topic. Furthermore, we can set these properties either before topic creation or alter them at runtime for a pre-existing topic.
同样的保留期属性适用于一个给定的Kafka主题内的所有消息。此外,我们可以在创建主题之前设置这些属性,或者在运行时为预先存在的主题改变这些属性。
In the following sections, we’ll learn how to tune this through broker configuration for setting the retention period for new topics and topic-level configuration to control it at runtime.
在下面的章节中,我们将学习如何通过经纪人配置来设置新主题的保留期和主题级配置来在运行时控制它。
3. Server-Level Configuration
3.服务器级别的配置
Apache Kafka supports a server-level retention policy that we can tune by configuring exactly one of the three time-based configuration properties:
Apache Kafka支持一个服务器级的保留策略,我们可以通过配置三个基于时间的配置属性中的一个来调整。
- log.retention.hours
- log.retention.minutes
- log.retention.ms
It’s important to understand that Kafka overrides a lower-precision value with a higher one. So, log.retention.ms would take the highest precedence.
重要的是要明白,Kafka会用更高的精度值来覆盖低精度值。所以,log.retention.ms会优先考虑。
3.1. Basics
3.1.基础知识
First, let’s inspect the default value for retention by executing the grep command from the Apache Kafka directory:
首先,让我们通过执行grep命令,从Apache Kafka目录检查保留值的默认值。
$ grep -i 'log.retention.[hms].*\=' config/server.properties
log.retention.hours=168
We can notice here that the default retention time is seven days.
我们在这里可以注意到,默认的保留时间是七天。
To retain messages only for ten minutes, we can set the value of the log.retention.minutes property in the config/server.properties:
要想只保留信息十分钟,我们可以在config/server.properties中设置log.retention.minutes>属性的值。
log.retention.minutes=10
3.2. Retention Period for New Topic
3.2.新主题的保留期
The Apache Kafka package contains several shell scripts that we can use to perform administrative tasks. We’ll use them to create a helper script, functions.sh, that we’ll use during the course of this tutorial.
Apache Kafka包包含几个shell脚本,我们可以用它们来执行管理任务。我们将使用它们来创建一个辅助脚本,functions.sh,我们将在本教程的过程中使用它。
Let’s start by adding two functions in functions.sh to create a topic and describe its configuration, respectively:
让我们先在functions.sh中添加两个函数,分别用于创建一个主题和描述其配置。
function create_topic {
topic_name="$1"
bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \
--partitions 1 --replication-factor 1 \
--zookeeper localhost:2181
}
function describe_topic_config {
topic_name="$1"
./bin/kafka-configs.sh --describe --all \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
Next, let’s create two standalone scripts, create-topic.sh and get-topic-retention-time.sh:
接下来,让我们创建两个独立的脚本,create-topic.sh和get-topic-retention-time.sh。
bash-5.1# cat create-topic.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
create_topic "${topic_name}"
exit $?
bash-5.1# cat get-topic-retention-time.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
exit $?
We must note that describe_topic_config will give all the properties configured for the topic. So, we used the awk one-liner to add a filter for the retention.ms property.
我们必须注意,describe_topic_config将给出为该主题配置的所有属性。因此,我们使用awk单行代码,为retention.ms属性添加一个过滤器。
Finally, let’s start the Kafka environment and verify retention period configuration for a new sample topic:
最后,让我们启动Kafka环境,并为一个新的样本主题验证保留期配置。
bash-5.1# ./create-topic.sh test-topic
Created topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=600000
Once the topic is created and described, we’ll notice that retention.ms is set to 600000 (ten minutes). That’s actually derived from the log.retention.minutes property that we had earlier defined in the server.properties file.
一旦主题被创建和描述,我们会注意到retention.ms被设置为600000(10分钟)。这实际上是从我们先前在server.properties文件中定义的log.retention.minutes属性衍生出来。
4. Topic-Level Configuration
4.主题层面的配置
Once the Broker server is started, log.retention.{hours|minutes|ms} server-level properties become read-only. On the other hand, we get access to the retention.ms property, which we can tune at the topic-level.
一旦Broker服务器启动,log.retention.{hours|minutes|ms}服务器级的属性就变成了只读。另一方面,我们可以访问retention.ms属性,我们可以在主题级进行调整。
Let’s add a method in our functions.sh script to configure a property of a topic:
让我们在我们的functions.sh脚本中添加一个方法来配置主题的一个属性。
function alter_topic_config {
topic_name="$1"
config_name="$2"
config_value="$3"
./bin/kafka-configs.sh --alter \
--add-config ${config_name}=${config_value} \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
Then, we can use this within an alter-topic-config.sh script:
然后,我们可以在alter-topic-config.sh脚本中使用这个。
#!/bin/sh
. ./functions.sh
alter_topic_retention_config $1 $2 $3
exit $?
Finally, let’s set retention time to five minutes for the test-topic and verify the same:
最后,让我们为测试主题设置保留时间为5分钟,并验证一下。
bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
Completed updating config for topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=300000
5. Validation
5.审定
So far, we’ve seen how we can configure the retention period of a message within a Kafka topic. It’s time to validate that a message indeed expires after the retention timeout.
到目前为止,我们已经看到了如何在Kafka主题中配置消息的保留时间。现在是时候验证一条消息是否真的在保留时间结束后过期了。
5.1. Producer-Consumer
5.1.生产者-消费者
Let’s add produce_message and consume_message functions in the functions.sh. Internally, these use the kafka-console-producer.sh and kafka-console-consumer.sh, respectively, for producing/consuming a message:
让我们在functions.sh中添加produce_message和consume_message函数。在内部,这些使用kafka-console-producer.sh和kafka-console-consumer.sh,分别用于生产/消费一个消息。
function produce_message {
topic_name="$1"
message="$2"
echo "${message}" | ./bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
function consume_message {
topic_name="$1"
timeout="$2"
./bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning \
--topic ${topic_name} \
--max-messages 1 \
--timeout-ms $timeout
}
We must note that the consumer is always reading messages from the beginning as we need a consumer that reads any available message in Kafka.
我们必须注意,消费者总是从头开始读取消息,因为我们需要一个读取Kafka中任何可用消息的消费者。
Next, let’s create a standalone message producer:
接下来,让我们创建一个独立的消息生产者。
bash-5.1# cat producer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
message="$2"
produce_message ${topic_name} ${message}
exit $?
Finally, let’s have a standalone message consumer:
最后,让我们有一个独立的消息消费者。
bash-5.1# cat consumer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
timeout="$2"
consume_message ${topic_name} $timeout
exit $?
5.2. Message Expiry
5.2 信息过期
Now that we have our basic setup ready, let’s produce a single message and consume it twice instantly:
现在我们已经准备好了我们的基本设置,让我们产生一个单一的消息并即时消费它两次。
bash-5.1# ./producer.sh "test-topic-2" "message1"
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
So, we can see that the consumer is repeatedly consuming any available message.
因此,我们可以看到,消费者正在重复消费任何可用的消息。
Now, let’s introduce a sleep delay of five minutes and then attempt to consume the message:
现在,让我们引入一个5分钟的睡眠延迟,然后尝试消费该信息。
bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
[2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
As expected, the consumer didn’t find any message to consume because the message has crossed its retention period.
正如预期的那样,消费者没有找到任何可以消费的信息,因为该信息已经超过了它的保留期。
6. Limitations
6.6.局限性
Internally, the Kafka Broker maintains another property called log.retention.check.interval.ms. This property decides the frequency at which messages are checked for expiry.
在内部,Kafka Broker维护另一个名为log.retention.check.interval.ms的属性。这个属性决定了消息被检查过期的频率。
So, to keep the retention policy effective, we must ensure that the value of the log.retention.check.interval.ms is lower than the property value of retention.ms for any given topic.
因此,为了保持保留策略的有效性,我们必须确保log.retention.check.interval.ms的值低于任何特定主题的retention.ms的属性值。
7. Conclusion
7.结语
In this tutorial, we explored Apache Kafka to understand the time-based retention policy for messages. In the process, we created simple shell scripts to simplify the administrative activities. Later, we created a standalone consumer and producer to validate the message expiry after the retention period.
在本教程中,我们探索了Apache Kafka,以了解基于时间的消息保留策略。在这个过程中,我们创建了简单的shell脚本来简化管理活动。后来,我们创建了一个独立的消费者和生产者,以验证保留期后的消息是否过期。