1. Introduction
1.导言
Consumer groups help to create more scalable Kafka applications by allowing more than one consumer to read from the same topic.
消费者组允许多个消费者从同一主题读取内容,有助于创建更具可扩展性的 Kafka 应用程序。
In this tutorial, we’ll understand consumer groups and how they rebalance partitions between their consumers.
在本教程中,我们将了解消费者组及其如何在消费者之间重新平衡分区。
2. What Are Consumer Groups?
2.什么是消费者团体?
A consumer group is a set of unique consumers associated with one or more topics. Each consumer can read from zero, one, or more than one partition. Furthermore, each partition can only be assigned to a single consumer at a given time. The partition assignment changes as the group members change. This is known as group rebalancing.
消费者组是一组与一个或多个主题相关联的唯一消费者。每个消费者可以从 0 个、1 个或多个分区中读取数据。此外,每个分区在给定时间内只能分配给一个消费者。分区分配会随着群成员的变化而变化。这就是所谓的组重新平衡。
The consumer group is a crucial part of Kafka applications. This allows the grouping of similar consumers and makes it possible for them to read in parallel from a partitioned topic. Hence, it improves the performance and scalability of Kafka applications.
消费者组是 Kafka 应用程序的重要组成部分。它允许对类似的消费者进行分组,使他们能够并行地从分区主题中读取数据。因此,它提高了 Kafka 应用程序的性能和可扩展性。
2.1. The Group Coordinator and the Group Leader
2.1.小组协调员和组长
When we instantiate a consumer group, Kafka also creates the group coordinator. The group coordinator regularly receives requests from the consumers, known as heartbeats. If a consumer stops sending heartbeats, the coordinator assumes that the consumer has either left the group or crashed. That’s one possible trigger for a partition rebalance.
当我们实例化一个消费者组时,Kafka 也会创建组协调器。组协调器会定期接收来自消费者的请求,这些请求被称为 “心跳”。如果某个消费者停止发送心跳,协调器就会认为该消费者要么已经离开了组,要么已经崩溃。这就是分区重新平衡的一个可能触发因素。
The first consumer who requests the group coordinator to join the group becomes the group leader. When a rebalance occurs for any reason, the group leader receives a list of the group members from the group coordinator. Then, the group leader reassigns the partitions among the consumers in that list using a customizable strategy set in the partition.assignment.strategy configuration.
第一个向小组协调人提出加入小组请求的消费者成为小组组长。当因任何原因发生重新平衡时,组长会从组协调者处收到一份组员列表。然后,组长将使用 partition.assignment.strategy 配置中设置的自定义策略,在该列表中的用户之间重新分配分区。
2.2. Committed Offsets
2.2.承诺抵消
Kafka uses the committed offset to keep track of the last position read from a topic. The committed offset is the position in the topic to which a consumer acknowledges having successfully processed. In other words, it’s the starting point for itself and other consumers to read events in subsequent rounds.
Kafka 使用已提交偏移量来跟踪从主题读取的最后位置。已提交偏移量是消费者确认已成功处理的主题中的位置。换句话说,它是自己和其他消费者在后续轮次中读取事件的起点。
Kafka stores the committed offsets from all partitions inside an internal topic named __consumer_offsets. We can safely trust its information since topics are durable and fault-tolerant for replicated brokers.
Kafka 将所有分区的已提交偏移量存储在名为 __consumer_offsets 的内部主题中。我们可以放心地信任它的信息,因为对于复制的代理来说,主题是持久和容错的。
2.3. Partition Rebalancing
2.3.分区重新平衡
A partition rebalance changes the partition ownership from one consumer to another. Kafka executes a rebalance automatically when a new consumer joins the group or when a consumer member of the group crashes or unsubscribes.
分区再平衡将分区所有权从一个消费者转移到另一个消费者。当有新的消费者加入群组,或者群组中的消费者成员崩溃或取消订阅时,Kafka 会自动执行重新平衡。
To improve scalability, when a new consumer joins the group, Kafka fairly shares the partitions from the other consumers with the newly added consumer. Additionally, when a consumer crashes, its partitions must be assigned to the remaining consumers in the group to avoid the loss of any unprocessed messages.
为了提高可扩展性,当有新的消费者加入组时,Kafka 会公平地与新加入的消费者共享其他消费者的分区。此外,当一个消费者崩溃时,其分区必须分配给组中的其余消费者,以避免丢失任何未处理的消息。
The partition rebalance uses the __consumer_offsets topic to make a consumer start reading a reassigned partition from the correct position.
分区重新平衡使用 __consumer_offsets 主题,使消费者从正确的位置开始读取重新分配的分区。
During a rebalance, consumers can’t consume messages. In other words, the broker becomes unavailable until the rebalance is done. Additionally, consumers lose their state and need to recalculate their cached values. The unavailability and cache recalculation during partition rebalance make the event consumption slower.
在重新平衡期间,消费者无法消费消息。换句话说,在重新平衡完成之前,代理不可用。此外,消费者会丢失他们的状态,需要重新计算他们的缓存值。分区再平衡期间的不可用性和缓存重新计算会降低事件消耗的速度。
3. Setting up the Application
3.设置应用程序
In this section, we’ll configure the basics to get a Spring Kafka application up and running.
在本节中,我们将配置基础知识,以启动并运行 Spring Kafka 应用程序。
3.1. Creating the Basic Configurations
3.1.创建基本配置
First, let’s configure the topic and its partitions:
首先,让我们配置主题及其分区:
@Configuration
public class KafkaTopicConfiguration {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
public NewTopic celciusTopic() {
return TopicBuilder.name("topic-1")
.partitions(2)
.build();
}
}
The above configuration is straightforward. We’re simply configuring a new topic named topic-1 with two partitions.
上述配置非常简单。我们只需配置一个名为 topic-1 的新主题和两个分区。
Now, let’s configure the producer:
现在,让我们来配置制片人:
@Configuration
public class KafkaProducerConfiguration {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Double> kafkaProducer() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Double> kafkaProducerTemplate() {
return new KafkaTemplate<>(kafkaProducer());
}
}
In the Kafka producer configuration above, we’re setting the broker address and the serializers that they use to write messages.
在上面的 Kafka 生产者配置中,我们设置了代理地址和用于写入消息的序列化器。
Finally, let’s configure the consumer:
最后,我们来配置消费者:
@Configuration
public class KafkaConsumerConfiguration {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, Double> kafkaConsumer() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DoubleDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Double> kafkaConsumerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Double> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumer());
return factory;
}
}
3.2. Setting up the Consumers
3.2.设置消费者
In our demo application, we’ll start with two consumers that belong to the same group named group-1 from topic-1:
在我们的演示应用程序中,我们将从 topic-1 中名为group-1的同一个组中的两个消费者开始:
@Service
public class MessageConsumerService {
@KafkaListener(topics = "topic-1", groupId = "group-1")
public void consumer0(ConsumerRecord<?, ?> consumerRecord) {
trackConsumedPartitions("consumer-0", consumerRecord);
}
@KafkaListener(topics = "topic-1", groupId = "group-1")
public void consumer1(ConsumerRecord<?, ?> consumerRecord) {
trackConsumedPartitions("consumer-1", consumerRecord);
}
}
The MessageConsumerService class registers two consumers to listen to topic-1 inside group-1 using the @KafkaListener annotation.
MessageConsumerService 类使用 @KafkaListener 注解注册了两个消费者,以监听 group-1 内的 topic-1 。
Now, let’s also define a field and a method in the MessageConsumerService class to keep track of the consumed partition:
现在,让我们在 MessageConsumerService 类中定义一个字段和一个方法,以跟踪已消耗的分区:
Map<String, Set<Integer>> consumedPartitions = new ConcurrentHashMap<>();
private void trackConsumedPartitions(String key, ConsumerRecord<?, ?> record) {
consumedPartitions.computeIfAbsent(key, k -> new HashSet<>());
consumedPartitions.computeIfPresent(key, (k, v) -> {
v.add(record.partition());
return v;
});
}
In the code above, we used ConcurrentHashMap to map each consumer name to a HashSet of all partitions consumed by that consumer.
在上面的代码中,我们使用 ConcurrentHashMap 将每个消费者名称映射到该消费者使用的所有分区的 HashSet 中。
4. Visualizing Partition Rebalance When a Consumer Leaves
4.可视化消费者离开时的分区再平衡
Now that we have all configurations set up and the consumers registered, we can visualize what Kafka does when one of the consumers leaves group-1. To do that, let’s define the skeleton for the Kafka integration test that uses an embedded broker:
现在,我们已经设置了所有配置并注册了消费者,我们可以直观地了解当其中一个消费者离开 group-1 时 Kafka 会做什么。为此,让我们定义使用嵌入式代理的 Kafka 集成测试的骨架:
@SpringBootTest(classes = ManagingConsumerGroupsApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class ManagingConsumerGroupsIntegrationTest {
private static final String CONSUMER_1_IDENTIFIER = "org.springframework.kafka.KafkaListenerEndpointContainer#1";
private static final int TOTAL_PRODUCED_MESSAGES = 50000;
private static final int MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP = 10000;
@Autowired
KafkaTemplate<String, Double> kafkaTemplate;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
MessageConsumerService consumerService;
}
In the above code, we inject the necessary beans to produce and consume messages: kafkaTemplate and consumerService. We’ve also injected the bean kafkaListenerEndpointRegistry to manipulate registered consumers.
在上述代码中,我们注入了生产和消费消息所需的 Bean:kafkaTemplate 和 consumerService 。我们还注入了 Bean kafkaListenerEndpointRegistry,以操作已注册的消费者。
Finally, we defined three constants that will be used in our test case.
最后,我们定义了将在测试用例中使用的三个常量。
Now, let’s define the test case method:
现在,让我们定义测试用例方法:
@Test
public void givenContinuousMessageFlow_whenAConsumerLeavesTheGroup_thenKafkaTriggersPartitionRebalance() throws InterruptedException {
int currentMessage = 0;
do {
kafkaTemplate.send("topic-1", RandomUtils.nextDouble(10.0, 20.0));
currentMessage++;
if (currentMessage == MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP) {
String containerId = kafkaListenerEndpointRegistry.getListenerContainerIds()
.stream()
.filter(a -> a.equals(CONSUMER_1_IDENTIFIER))
.findFirst()
.orElse("");
MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer(containerId);
Thread.sleep(2000);
Objects.requireNonNull(container).stop();
kafkaListenerEndpointRegistry.unregisterListenerContainer(containerId);
}
} while (currentMessage != TOTAL_PRODUCED_MESSAGES);
Thread.sleep(2000);
assertEquals(1, consumerService.consumedPartitions.get("consumer-1").size());
assertEquals(2, consumerService.consumedPartitions.get("consumer-0").size());
}
In the test above, we’re creating a flow of messages, and at a certain point, we remove one of the consumers so Kafka will reassign its partitions to the remaining consumer. Let’s break down the logic to make it more transparent:
在上面的测试中,我们创建了一个消息流,在某一点上,我们移除了其中一个消费者,这样 Kafka 就会将其分区重新分配给剩余的消费者。让我们分解一下逻辑,使其更加透明:
- The main loop uses kafkaTemplate to produce 50,000 events of random numbers using Apache Commons’ RandomUtils. When an arbitrary number of messages is produced —10,000 in our case — we stop and unregister one consumer from the broker.
- To unregister a consumer, we first use a stream to search for the matching consumer in the container and retrieve it using the getListenerContainer() method. Then, we call stop() to stop the container Spring component’s execution. Finally, we call unregisterListenerContainer() to programmatically unregister the listener associated with the container variable from the Kafka Broker.
Before discussing the test assertions, let’s glance at a few log lines that Kafka generated during the test execution.
在讨论测试断言之前,我们先来看看 Kafka 在测试执行过程中生成的几行日志。
The first vital line to see is the one that shows the LeaveGroup request made by consumer-1 to the group coordinator:
首先要看的重要一行是显示 Consumer-1 向群组协调员发出的 LeaveGroup 请求的一行:
INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-group-1-1, groupId=group-1] Member consumer-group-1-1-4eb63bbc-336d-44d6-9d41-0a862029ba95 sending LeaveGroup request to coordinator localhost:9092
Then, the group coordinator automatically triggers a rebalance and shows the reason behind that:
然后,小组协调员会自动触发再平衡,并显示背后的原因:
INFO k.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Preparing to rebalance group group-1 in state PreparingRebalance with old generation 2 (__consumer_offsets-4) (reason: Removing member consumer-group-1-1-4eb63bbc-336d-44d6-9d41-0a862029ba95 on LeaveGroup)
Returning to our test, we’ll assert that the partition rebalance occurred correctly. Since we unregistered the consumer ending in 1, its partitions should be reassigned to the remaining consumer, which is consumer-0. Hence, we’ve used the map of tracked consumed records to check that consumer-1 only consumed from one partition, whereas consumer-0 consumed from two partitions.
回到我们的测试,我们将断言分区重新平衡正确进行。由于我们取消了以 1 结尾的消费者的注册,因此其分区应重新分配给剩余的消费者,即 consumer-0 。因此,我们使用已跟踪消费记录的映射来检查 consumer-1 是否只从一个分区消费,而 consumer-0 是否从两个分区消费。
4. Useful Consumer Configurations
4.有用的消费者配置
Now, let’s talk about a few consumer configurations that impact partition rebalance and the trade-offs of setting specific values for them.
现在,让我们来谈谈影响分区再平衡的几种消费者配置,以及为它们设置特定值的权衡。
4.1. Session Timeouts and Heartbeats Frequency
4.1.会话超时和心跳频率
The session.timeout.ms parameter indicates the maximum time in milliseconds that the group coordinator can wait for a consumer to send a heartbeat before triggering a partition rebalance. Alongside session.timeout.ms, the heartbeat.interval.ms indicates the frequency in milliseconds that a consumer sends heartbeats to the group coordinator.
session.timeout.ms参数表示组协调器在触发分区重新平衡前等待用户发送心跳的最长时间(以毫秒为单位)。除了 session.timeout.ms 参数,heartbeat.interval.ms 参数还指示用户向组协调器发送心跳的频率(以毫秒为单位)。
We should modify the consumer timeout and heartbeat frequency together so that heartbeat.interval.ms is always lower than session.timeout.ms. This is because we don’t want to let a consumer die by timeout before sending their heartbeats. Typically, we set the heartbeat interval to 33% of the session timeout to guarantee that more than one heartbeat is sent before the consumer dies.
我们应该同时修改消费者超时和心跳频率,以便 heartbeat.interval.ms 始终低于session.timeout.ms 。这是因为我们不想让消费者在发送心跳之前就因超时而死亡。通常,我们将心跳间隔设置为会话超时的 33%,以确保在消费者死亡之前发送一次以上的心跳。
The default consumer session timeout is set to 45 seconds. We can modify that value as long as we understand the trade-offs of modifying it.
消费者会话超时默认设置为 45 秒。我们可以修改该值,只要我们了解修改该值的利弊。
When we set the session timeout lower than the default, we increase the speed at which the consumer group recovers from a failure, improving the group availability. However, in Kafka versions before 0.10.1.0, if the main thread of a consumer is blocked when consuming a message that takes longer than the session timeout, the consumer can’t send heartbeats. Therefore, the consumer is considered dead, and the group coordinator triggers an unwanted partition rebalance. This was fixed in KIP-62, introducing a background thread that only sends heartbeats.
当我们将会话超时设置为低于默认值时,我们将提高消费者组从故障中恢复的速度,从而提高组的可用性。但是,在 0.10.1.0 之前的 Kafka 版本中,如果消费者的主线程在消费超过会话超时的消息时被阻塞,消费者就无法发送心跳。因此,消费者被视为死亡,组协调器会触发不必要的分区再平衡。KIP-62 中修正了这一问题,引入了一个只发送心跳的后台线程。
If we set higher values for the session timeout, we lose at detecting failures faster. However, this might fix the unwanted partition rebalance problem mentioned above for Kafka versions older than o.10.1.0.
如果我们为会话超时值设置更高的值,我们就无法更快地检测到故障。不过,这可能会解决上文提到的 Kafka 旧版本(o.10.1.0)不需要的分区再平衡问题。
4.2. Max Poll Interval Time
4.2.最大轮询间隔时间
Another configuration is the max.poll.interval.ms, indicating the maximum time the broker can wait for idle consumers. After that time passes, the consumer stops sending heartbeats until it reaches the session timeout configured and leaves the group. The default wait time for max.poll.interval.ms is five minutes.
另一项配置是 max.poll.interval.ms,表示代理等待空闲消费者的最长时间。超过该时间后,消费者将停止发送心跳,直到达到配置的会话超时并离开组。max.poll.interval.ms的默认等待时间是五分钟。
If we set higher values for max.poll.interval.ms, we’re giving more room for consumers to remain idle, which might be helpful to avoid rebalances. However, increasing that time might also increase the number of idle consumers if there are no messages to consume. This can be a problem in a low-throughput environment because consumers can remain idle longer, increasing infrastructure costs.
如果我们为max.poll.interval.ms设置更高的值,我们将为消费者提供更多的空闲时间,这可能有助于避免重新平衡。但是,如果没有信息要消耗,增加该时间也可能会增加空闲消费者的数量。这在低吞吐量环境中可能会造成问题,因为消费者闲置的时间会更长,从而增加基础设施成本。
5. Conclusion
5.结论
In this article, we’ve looked at the fundamentals of the roles of the group leader and the group coordinator. We’ve also looked into how Kafka manages consumer groups and partitions.
在本文中,我们了解了组长和组协调者角色的基本原理。我们还了解了 Kafka 如何管理消费者组和分区。
We’ve seen in practice how Kafka automatically rebalances the partitions within the group when one of its consumers leaves the group.
在实践中,我们已经看到 Kafka 是如何在一个消费者离开群组时自动重新平衡群组内的分区的。
It’s essential to understand when Kafka triggers partition rebalance and tune the consumer configurations accordingly.
了解 Kafka 何时触发分区再平衡并相应调整消费者配置至关重要。
As always, the source code for the article is available over on GitHub.
一如既往,本文的源代码可在 GitHub 上获取。