1. Overview
1.概述
In Kafka, consumers read messages from partitions. While reading messages, there are some concerns to consider, like determining which messages to read from the partitions or, preventing duplicate message reading or message loss in case of failure. The solution to these concerns is the use of offsets.
在Kafka中,消费者从分区读取消息。在读取消息时,需要考虑一些问题,例如确定从分区读取哪些消息,或者防止重复读取消息或在出现故障时丢失消息。解决这些问题的方法就是使用偏移量。
In this tutorial, we’ll learn about offsets in Kafka. We’ll see how to commit offsets to manage message consumption and discuss its methods and drawbacks.
在本教程中,我们将学习 Kafka 中的偏移量。我们将了解如何提交偏移量来管理消息消耗,并讨论其方法和缺点。
2. What Is Offset?
2.什么是抵消?
We know that Kafka stores messages in topics, and each topic can have multiple partitions. Each consumer reads messages from one partition of a topic. Here, Kafka, with the help of offsets, keeps track of the messages that consumers read. Offsets are integers starting from zero that increment by one as the message gets stored.
我们知道,Kafka 将消息存储在主题中,每个主题可以有多个分区。每个消费者从一个主题的一个分区中读取消息。在这里,Kafka 借助偏移量来跟踪消费者读取的消息。偏移量是一个从零开始的整数,随着消息被存储,偏移量会以一为单位递增。
Let’s say one consumer has read five messages from a partition. Then, based on configuration, Kafka marks the offset till 4 as committed(zero-based sequence). The consumer consumes messages with offsets 5 onwards the next time it attempts to read messages.
假设一个消费者从一个分区读取了五条消息。然后,根据配置,Kafka 将直到 4 的偏移量标记为已提交(基于零的序列)。消费者下次尝试读取消息时,就会消耗偏移量为5以后的消息。
Without offsets, there is no way to avoid duplicate processing or data loss. That’s why it’s so crucial.
没有偏移,就无法避免重复处理或数据丢失。这就是它如此重要的原因。
We can make an analogy with database storage. In a database, we commit after executing SQL statements to persist the changes. In the same way, after reading from the partition, we commit offsets to mark the position of the processed message.
我们可以将其与数据库存储进行类比。在数据库中,我们在执行 SQL 语句后提交,以持久保存更改。同样,从分区读取信息后,我们要提交偏移量,以标记已处理信息的位置。
3. Ways to Commit Offsets
3.承诺抵消的方式
There are four ways to commit offsets. We’ll look at each in detail and discuss their use cases, advantages, and disadvantages.
我们将详细介绍每种方法,并讨论它们的用例、优点和缺点。
Let’s start by adding the Kafka Client API dependency in the pom.xml:
首先,让我们在 pom.xml 中添加 Kafka 客户端 API 依赖关系:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
3.1. Auto Commit
3.1.自动提交
This is the simplest way to commit offsets. Kafka, by default, uses auto-commit – at every five seconds it commits the largest offset returned by the poll() method. poll() returns a set of messages with a timeout of 10 seconds, as we can see in the code:
这是提交偏移量的最简单方法。默认情况下,Kafka 使用自动提交功能–每五秒提交一次 poll() 方法返回的最大偏移量。poll()返回一组消息,超时时间为10秒,正如我们在代码中看到的那样:
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<Long, String> message : messages) {
// processed message
}
The problem with auto-commit is that there is a very high chance of data loss in case of application failure. When poll() returns the messages, Kafka may commit the largest offset before processing messages.
自动提交的问题在于,如果应用程序发生故障,数据丢失的几率非常高。当 poll() 返回消息时,Kafka 可能会在处理消息之前提交最大偏移量。
Let’s say poll() returns 100 messages, and the consumer processes 60 messages when the auto-commit happens. Then, due to some failure, the consumer crashes. When a new consumer goes live to read messages, it commences reading from offset 101, resulting in the loss of messages between 61 and 100.
假设 poll() 返回 100 条消息,消费者在自动提交时处理了 60 条消息。然后,由于某些故障,消费者崩溃了。当一个新的消费者上线读取消息时,它会从偏移量 101 开始读取,从而导致 61 到 100 之间的消息丢失。
Thus, we need other ways where this drawback isn’t present. The answer is manual commit.
因此,我们需要其他不存在这一缺点的方法。答案就是手动提交。
3.2. Manual Sync Commit
3.2.手动同步提交
In manual commits, whether sync or async, it’s necessary to disable auto-commit by setting the default property (enabled.auto.commit property) to false:
在同步或异步手动提交中,有必要通过将默认属性(enabled.auto.commit property )设置为 false 来禁用自动提交:
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
After disabling the manual commit, let’s now understand the use of commitSync():
禁用手动提交后,现在让我们来了解一下 commitSync() 的用法:
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
//process the messages
consumer.commitSync();
This method prevents data loss by committing the offset only after processing the messages. However, it doesn’t prevent duplicate reading when a consumer crashes before committing the offset. Besides this, it also impacts application performance.
此方法仅在处理报文后才提交偏移量,从而防止数据丢失。但是,当消费者在提交偏移量之前崩溃时,它无法防止重复读取。除此之外,它还会影响应用程序的性能。
The commitSync() blocks the code until it completes. Also, in case of an error, it keeps on retrying. This decreases the throughput of the application, which we don’t want. So, Kafka provides another solution, async commit, that deals with these drawbacks.
commitSync()会阻塞代码直到完成。此外,如果出现错误,它会不断重试。这会降低应用程序的吞吐量,这是我们不希望看到的。因此,Kafka 提供了另一种解决方案,即 async commit,可以解决这些缺点。
3.3. Manual Async Commit
3.3.手动异步提交
Kafka provides commitAsync() to commit offsets asynchronously. It overcomes the performance overhead of manual sync commits by committing offsets in different threads. Let’s implement an async commit to understand this:
Kafka 提供了 commitAsync() 来异步提交偏移量。它通过在不同线程中提交偏移量,克服了手动同步提交的性能开销。让我们实现一个异步提交来理解这一点:
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
//process the messages
consumer.commitAsync();
The problem with the async commit is that it doesn’t retry in case of failure. It relies on the next call of commitAsync(), which will commit the latest offset.
异步提交的问题在于它不会在失败时重试。它依靠下一次调用 commitAsync() 来提交最新的偏移量。
Suppose 300 is the largest offset we want to commit, but our commitAsync() fails due to some issue. It could be possible that before it retries, another call of commitAsync() commits the largest offset of 400 as it is asynchronous. When failed commitAsync() retries and if it commits offsets 300 successfully, it will overwrite the previous commit of 400, resulting in duplicate reading. That is why commitAsync() doesn’t retry.
假设 300 是我们要提交的最大偏移量,但我们的 commitAsync() 由于某些问题而失败。可能在重试之前,另一次调用 commitAsync() 会提交最大偏移量 400,因为它是异步的。当 commitAsync() 重试失败时,如果它成功提交了 300 个偏移量,就会覆盖之前提交的 400 个偏移量,导致重复读取。这就是 commitAsync() 不重试的原因。
3.4. Commit Specific Offset
3.4.承诺特定抵消
Sometimes, we need to take more control over offsets. Let’s say we’re processing the messages in small batches and want to commit the offsets as soon as messages are processed. We can use the overloaded method of commitSync() and commitAsync() that takes a map argument to commit the specific offset:
有时,我们需要对偏移量进行更多控制。比方说,我们正在小批量处理报文,并希望在报文处理完毕后立即提交偏移量。我们可以使用commitSync()和commitAsync() 的重载方法来提交特定的偏移量:
KafkaConsumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(KafkaConfigProperties.getTopic());
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int messageProcessed = 0;
while (true) {
ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<Long, String> message : messages) {
// processed one message
messageProcessed++;
currentOffsets.put(
new TopicPartition(message.topic(), message.partition()),
new OffsetAndMetadata(message.offset() + 1));
if (messageProcessed%50==0){
consumer.commitSync(currentOffsets);
}
}
}
In this code, we manage a currentOffsets map, which takes TopicPartition as key and OffsetAndMetadata as value. We insert the TopicPartition and OffsetAndMetadata of processed messages during message processing into the currentOffsets map. When the number of processed messages reaches fifty, we call commitSync() with the currentOffsets map to mark these messages as committed.
在这段代码中,我们管理一个 currentOffsets 映射,该映射将 TopicPartition 作为键,将 OffsetAndMetadata 作为值。我们在消息处理过程中将已处理消息的 TopicPartition 和 OffsetAndMetadata 插入到 currentOffsets 映射中。当处理的消息数量达到 50 个时,我们会调用 currentOffsets 映射中的 commitSync() 来标记这些消息已提交。
The behavior of this way is the same as sync and async commit. The only difference is that here we’re deciding the offsets to be committed not Kafka.
这种方式的行为与同步和异步提交相同。唯一不同的是,这里是由我们决定要提交的偏移量,而不是 Kafka。
4. Conclusion
4.结论
In this article, we learned about the offset and its importance in Kafka. Further, we explored the four ways to commit the offsets, both manual and automatic. Lastly, we analyzed their respective pros and cons. We can conclude that there is no definitive best way to commit in Kafka; rather, it depends on the specific use cases.
在本文中,我们了解了偏移量及其在 Kafka 中的重要性。此外,我们还探讨了手动和自动提交偏移量的四种方法。最后,我们分析了它们各自的优缺点。我们可以得出这样的结论:在 Kafka 中并没有明确的最佳提交方式;相反,它取决于具体的使用情况。
All the code examples used in this article are available over on GitHub.
本文中使用的所有代码示例均可在 GitHub 上获取。