Sending Data to a Specific Partition in Kafka – 将数据发送到 Kafka 中的特定分区

最后修改: 2024年 1月 4日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.导言

Apache Kafka is a distributed streaming platform that excels in handling massive real-time data streams. Kafka organizes data into topics and further divides topics into partitions. Each partition acts as an independent channel, enabling parallel processing and fault tolerance.

Apache Kafka 是一个分布式流平台,擅长处理海量实时数据流。Kafka 将数据组织成 主题,并进一步将主题划分为分区。每个分区都是一个独立的通道,可实现并行处理和容错

In this tutorial, we delve into the techniques for sending data to specific partitions in Kafka. We’ll explore the benefits, implementation methods, and potential challenges associated with this approach.

在本教程中,我们将深入探讨将数据发送到 Kafka 中特定分区的技术。我们将探讨这种方法的优点、实施方法和潜在挑战。

2. Understanding Kafka Partitions

2. 了解 Kafka 分区

Now, let’s explore the fundamental concept of Kafka partitions.

现在,让我们来探讨一下 Kafka 分区的基本概念。

2.1. What Are Kafka Partitions

2.1.什么是 Kafka 分区 2.1.

When a producer sends messages to a Kafka topic, Kafka organizes these messages into partitions using a specified partitioning strategy. A partition is a fundamental unit that represents a linear, ordered sequence of messages. Once a message is produced, it is assigned to a particular partition based on the chosen partitioning strategy. Subsequently, the message is appended to the end of the log within that partition.

当生产者向 Kafka 主题发送消息时,Kafka 会使用指定的分区策略将这些消息组织到分区中。分区是一个基本单元,代表线性、有序的消息序列。消息一旦产生,就会根据所选的分区策略被分配到特定的分区中。随后,信息会被附加到该分区中日志的末尾。

2.2. Parallelism and Consumer Groups

2.2.并行性和消费者群体

A Kafka topic may be divided into multiple partitions, and a consumer group can be assigned a subset of these partitions. Each consumer within the group processes messages independently from its assigned partitions. This parallel processing mechanism enhances overall throughput and scalability, allowing Kafka to handle large volumes of data efficiently.

Kafka 主题可分为多个分区,消费者组可被分配给这些分区的一个子集。组内的每个消费者都会独立处理来自其分配分区的消息。这种并行处理机制提高了整体吞吐量和可扩展性,使 Kafka 能够高效地处理大量数据。

2.3. Ordering and Processing Guarantee

2.3.订购和加工保证

Within a single partition, Kafka ensures that messages are processed in the same order they were received. This guarantees sequential processing for applications that rely on message order, like financial transactions or event logs. However, note that the order messages are received may differ from the order they were originally sent due to network delays and other operational considerations.

在单个分区中,Kafka 可确保按照接收到的相同顺序处理消息。这可保证依赖于消息顺序的应用程序(如金融交易或事件日志)的顺序处理。不过,请注意,由于网络延迟和其他操作因素,接收消息的顺序可能与最初发送消息的顺序不同。

Across different partitions, Kafka does not impose a guaranteed order. Messages from different partitions may be processed concurrently, introducing the possibility of variations in the order of events. This characteristic is essential to consider when designing applications that rely on the strict ordering of messages.

在不同分区之间,Kafka 不会强加一个有保证的顺序。来自不同分区的消息可能会被并发处理,从而带来事件顺序变化的可能性。在设计依赖消息严格排序的应用程序时,必须考虑这一特性。

2.4. Fault Tolerance and High Availability

2.4.容错和高可用性

Partitions also contribute to Kafka’s exceptional fault tolerance. Each partition can be replicated across multiple brokers. In the event of a broker failure, the replicated partitions can still be accessed and ensure continuous access to the data.

分区还有助于 Kafka 实现出色的容错能力。每个分区都可以在多个代理之间复制。如果代理发生故障,复制的分区仍可被访问,并确保对数据的持续访问。

The Kafka cluster can seamlessly redirect consumers to healthy brokers, maintaining data availability and high system reliability.

Kafka 集群可以将消费者无缝重定向到健康的经纪商,从而保持数据的可用性和系统的高可靠性。

3. Why Send Data to Specific Partitions

3.为什么要向特定分区发送数据

In this section, let’s explore the reasons for sending data to specific partitions.

本节将探讨向特定分区发送数据的原因。

3.1. Data Affinity

3.1.数据亲和性

Data affinity refers to the intentional grouping of related data within the same partition. By sending related data to specific partitions, we ensure that it is processed together, leading to increased processing efficiency.

数据亲和性是指在同一分区中有意将相关数据分组。通过将相关数据发送到特定分区,我们可以确保这些数据一起处理,从而提高处理效率。

For instance, consider a scenario where we might want to ensure a customer’s orders reside in the same partition for order tracking and analytics. Guaranteeing that all orders from a specific customer end up in the same partition simplifies tracking and analysis processes.

例如,我们可能希望确保客户的订单位于同一分区,以便进行订单跟踪和分析。确保来自特定客户的所有订单都在同一个分区中,可以简化跟踪和分析流程。

3.2. Load Balancing

3.2.负载平衡

Additionally, distributing data evenly across partitions can help to ensure optimal resource utilization. Evenly distributing data across partitions helps optimize resource utilization within a Kafka cluster. By sending data to partitions based on load considerations, we can prevent resource bottlenecks and ensure that each partition receives a manageable and balanced workload.

此外,在分区之间均匀地分配数据有助于确保最佳的资源利用率。在分区之间平均分配数据有助于优化 Kafka 集群内的资源利用率。通过根据负载考虑因素向分区发送数据,我们可以防止资源瓶颈,并确保每个分区接收可管理的均衡工作负载。

3.3. Prioritization

3.3.确定优先次序

In certain scenarios, not all data has equal priority or urgency. Kafka’s partitioning capabilities enable the prioritization of critical data by directing it to dedicated partitions for expedited handling. This prioritization ensures that high-priority messages receive prompt attention and faster processing compared to less critical ones.

在某些情况下,并非所有数据都具有相同的优先级或紧迫性。Kafka 的分区功能可将关键数据引导到专用分区进行快速处理,从而实现关键数据的优先级排序。与不太重要的数据相比,这种优先级排序可确保高优先级的消息得到及时关注和更快处理。

4. Methods for Sending to Specific Partitions

4.发送到特定分区的方法

Kafka provides various strategies for assigning messages to partitions, offering data distribution and processing flexibility. Below are some common methods that can be used to send messages to a specific partition.

Kafka 提供了将消息分配到分区的各种策略,从而提供了数据分布和处理的灵活性。下面是一些可用于将消息发送到特定分区的常用方法。

4.1. Sticky Partitioner

4.1.粘性分区器

In Kafka versions 2.4 and above, the sticky partitioner aims to keep messages without keys together in the same partition. However, this behavior isn’t absolute and interacts with batching settings such as batch.size and linger.ms.

在 Kafka 2.4 及以上版本中,粘性分区器的目标是将没有键的消息集中在同一个分区中。不过,这种行为并不是绝对的,它会与批处理设置(如 batch.size linger.ms)发生交互。

To optimize the message delivery, Kafka groups messages into batches before sending them to brokers. The batch.size setting (default 16,384 bytes) controls the maximum batch size, affecting how long messages stay in the same partition under the sticky partitioner.

为了优化消息传递,Kafka 会在将消息发送到代理之前将其分组成批。batch.size设置(默认为 16,384 字节)控制着最大批量大小,影响着消息在粘性分区器下的同一分区中停留的时间。

The linger.ms configuration (default: 0 milliseconds) introduces a delay before sending batches, potentially prolonging sticky behavior for messages without keys.

linger.ms配置(默认值:0 毫秒)会在发送批次前引入延迟,可能会延长无密钥信息的粘性行为。

In the following test case, assuming the default batching configuration remains in place. We’ll send three messages without explicitly assigning a key. We should be expecting them to be initially assigned to the same partition:

在下面的测试用例中,假设默认的批处理配置保持不变。我们将发送三条信息,但不显式分配密钥。我们应该希望它们最初被分配到同一个分区:

kafkaProducer.send("default-topic", "message1");
kafkaProducer.send("default-topic", "message2");
kafkaProducer.send("default-topic", "message3");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 3);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();

Set<Integer> uniquePartitions = records.stream()
  .map(ReceivedMessage::getPartition)
  .collect(Collectors.toSet());

Assert.assertEquals(1, uniquePartitions.size());

4.2. Key-based Approach

4.2.基于密钥的方法

In the key-based approach, Kafka directs messages with identical keys to the same partition, optimizing the processing of related data. This is achieved through a hash function, ensuring deterministic mapping of message keys to partitions.

在基于密钥的方法中,Kafka 会将具有相同密钥的消息导向相同的分区,从而优化相关数据的处理。这是通过哈希函数实现的,确保了消息键与分区的确定性映射。

In this test case, messages with the same key partitionA should always land in the same partition. Let’s illustrate key-based partitioning with the following code snippet:

在此测试案例中,具有相同密钥 partitionA 的消息应始终位于同一分区。让我们用下面的代码片段来说明基于键的分区:

kafkaProducer.send("order-topic", "partitionA", "critical data");
kafkaProducer.send("order-topic", "partitionA", "more critical data");
kafkaProducer.send("order-topic", "partitionB", "another critical message");
kafkaProducer.send("order-topic", "partitionA", "another more critical data");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 4);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
Map<String, List<ReceivedMessage>> messagesByKey = groupMessagesByKey(records);

messagesByKey.forEach((key, messages) -> {
    int expectedPartition = messages.get(0)
      .getPartition();
    for (ReceivedMessage message : messages) {
        assertEquals("Messages with key '" + key + "' should be in the same partition", message.getPartition(), expectedPartition);
    }
});

In addition, with the key-based approach, messages sharing the same key are consistently received in the order they were produced within a specific partition. This guarantees the preservation of message order within a partition, especially for related messages.

此外,通过基于密钥的方法,共享相同密钥的报文将按照其在特定分区内产生的顺序被一致接收。这保证了分区内报文顺序的保持,尤其是相关报文的顺序

In this test case, we produce messages with the key partitionA in a specific order, and the test actively verifies that these messages are received in the same order within the partition:

在这个测试用例中,我们按特定顺序生成带有密钥 partitionA 的报文,测试会主动验证这些报文是否在分区内按相同顺序被接收:

kafkaProducer.send("order-topic", "partitionA", "message1");
kafkaProducer.send("order-topic", "partitionA", "message3");
kafkaProducer.send("order-topic", "partitionA", "message4");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 3);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();

StringBuilder resultMessage = new StringBuilder();
records.forEach(record -> resultMessage.append(record.getMessage()));
String expectedMessage = "message1message3message4";

assertEquals("Messages with the same key should be received in the order they were produced within a partition", 
  expectedMessage, resultMessage.toString());

4.3. Custom Partitioning

4.3.自定义分区

For fine-grained control, Kafka allows defining custom partitioners. These classes implement the Partitioner interface, enabling us to write logic based on message content, metadata, or other factors to determine the target partition.

为了实现精细控制,Kafka 允许定义自定义分区器。这些类实现了 Partitioner 接口,使我们能够根据消息内容、元数据或其他因素编写逻辑,以确定目标分区。

In this section, we’ll create a custom partitioning logic based on the customer type when dispatching orders to a Kafka topic. Specifically, premium customer orders will be directed to one partition, while normal customer orders will find their way to another.

在本节中,我们将在向 Kafka 主题分派订单时,根据客户类型创建自定义分区逻辑。具体来说,优质客户的订单将被定向到一个分区,而普通客户的订单将被定向到另一个分区。

To begin, we create a class named CustomPartitioner, inheriting from the Kafka Partitioner interface. Within this class, we override the partition() method with custom logic to determine the destination partition for each message:

首先,我们创建了一个名为 CustomPartitioner 的类,该类继承自 Kafka Partitioner 接口。在该类中,我们使用自定义逻辑覆盖 partition() 方法,以确定每条消息的目标分区:

public class CustomPartitioner implements Partitioner {
    private static final int PREMIUM_PARTITION = 0;
    private static final int NORMAL_PARTITION = 1;

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String customerType = extractCustomerType(key.toString());
        return "premium".equalsIgnoreCase(customerType) ? PREMIUM_PARTITION : NORMAL_PARTITION;
    }

    private String extractCustomerType(String key) {
        String[] parts = key.split("_");
        return parts.length > 1 ? parts[1] : "normal";
    }
   
    // more methods
}

Next, to apply this custom partitioner in Kafka, we need to set the PARTITIONER_CLASS_CONFIG property in the producer configuration. Kafka will use this partitioner to determine the partition for each message based on the logic defined in the CustomPartitioner class.

接下来,要在 Kafka 中应用此自定义分区器,我们需要在生产者配置中设置 PARTITIONER_CLASS_CONFIG 属性。Kafka 将根据 CustomPartitioner 类中定义的逻辑,使用该分区器来确定每条消息的分区。

The method setProducerToUseCustomPartitioner() is used to set up the Kafka producer to use the CustomPartitioner:

方法 setProducerToUseCustomPartitioner() 用于设置 Kafka 生产者以使用 CustomPartitioner:

private KafkaTemplate<String, String> setProducerToUseCustomPartitioner() {
    Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
    DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);

    return new KafkaTemplate<>(producerFactory);
}

We then construct a test case to ensure that the custom partitioning logic correctly routes premium and normal customer orders to their respective partitions:

然后,我们构建了一个测试用例,以确保自定义分区逻辑正确地将高级客户订单和普通客户订单路由到各自的分区:

KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();

kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 2);

consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0)));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    assertEquals("Premium order message should be in partition 0", 0, record.partition());
    assertEquals("123_premium", record.key());
}

4.4. Direct Partition Assignment

4.4.直接分区分配

When manually migrating data between topics or adjusting data distribution across partitions, direct partition assignment could help control the message placement. Kafka also offers the ability to send messages directly to specific partitions using the ProductRecord constructor that accepts a partition number. By specifying the partition number, we can explicitly dictate the destination partition for each message.

在主题间手动迁移数据或调整分区间的数据分布时,直接分区分配有助于控制消息的位置。Kafka 还提供了使用接受分区编号的 ProductRecord 构造函数将消息直接发送到特定分区的功能。通过指定分区编号,我们可以为每条消息明确指定目标分区。

In this test case, we specify the second argument in the send() method to take in the partition number:

在此测试用例中,我们指定 send() 方法的第二个参数为分区编号:

kafkaProducer.send("order-topic", 0, "123_premium", "Premium order message");
kafkaProducer.send("order-topic", 1, "456_normal", "Normal order message");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 2);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();

for (ReceivedMessage record : records) {
    if ("123_premium".equals(record.getKey())) {
        assertEquals("Premium order message should be in partition 0", 0, record.getPartition());
    } else if ("456_normal".equals(record.getKey())) {
        assertEquals("Normal order message should be in partition 1", 1, record.getPartition());
    }
}

5. Consume from Specific Partitions

5.从特定分区消费

To consume data from specific partitions in Kafka on the consumer side, we can specify the partitions we want to subscribe to using the KafkaConsumer.assign() method. This grants fine-grained control over consumption but requires managing partition offsets manually.

要在消费者端消费 Kafka 中特定分区的数据,我们可以使用 KafkaConsumer.assign() 方法指定要订阅的分区。这样可以对消费进行细粒度控制,但需要手动管理分区偏移量。

Here’s an example of consuming messages from specific partitions using the assign() method:

下面是一个使用 assign() 方法从特定分区消费信息的示例:

KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();

kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 2);

consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0)));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    assertEquals("Premium order message should be in partition 0", 0, record.partition());
    assertEquals("123_premium", record.key());
}

6. Potential Challenges and Considerations

6.潜在挑战和考虑因素

When sending messages to specific partitions, there is a risk of uneven load distribution among partitions. This can occur if the logic used for partitioning doesn’t distribute messages uniformly across all partitions. Moreover, scaling the Kafka cluster, which involves adding or removing brokers, can trigger partition reassignment. During reassignment, brokers may move partitions, potentially disrupting the order of messages or causing temporary unavailability.

向特定分区发送消息时,分区之间存在负载分配不均的风险。如果用于分区的逻辑没有在所有分区中统一分配消息,就会出现这种情况。此外,扩展 Kafka 集群(涉及添加或删除代理)可能会触发分区重新分配。在重新分配期间,代理服务器可能会移动分区,从而可能扰乱消息的顺序或导致暂时不可用。

Therefore, we should regularly monitor the load on each partition using Kafka tools or metrics. For example, Kafka Admin Client and Micrometer can assist in gaining insights into partition health and performance. We can use the Admin Client to retrieve information about topics, partitions, and their current state; and use the Micrometer for metrics monitoring.

因此,我们应使用 Kafka 工具或指标定期监控每个分区的负载。例如,Kafka 管理客户端和 Micrometer 可帮助我们深入了解分区的健康状况和性能。我们可以使用管理员客户端检索有关主题、分区及其当前状态的信息,并使用 Micrometer 进行指标监控。

Additionally, anticipate the need to proactively adjust the partitioning strategy or horizontally scale the Kafka cluster to manage the increased load on specific partitions effectively. We may also consider increasing the number of partitions or adjusting key ranges for a more even spread.

此外,预计需要主动调整分区策略或横向扩展 Kafka 集群,以有效管理特定分区上增加的负载。我们还可以考虑增加分区数量或调整密钥范围,使分布更加均匀。

7. Conclusion

7.结论

In summary, the ability to send messages to specific partitions in Apache Kafka opens up powerful possibilities for optimizing data processing and enhancing overall system efficiency.

总之,在 Apache Kafka 中向特定分区发送消息的功能为优化数据处理和提高整体系统效率提供了强大的可能性。

Throughout this tutorial, we explored various methods for directing messages to specific partitions, including the key-based approach, custom partitioning, and direct partition assignment. Each method offers distinct advantages, allowing us to tailor based on the specific requirements of the applications.

在本教程中,我们探讨了将信息定向到特定分区的各种方法,包括基于密钥的方法、自定义分区和直接分区分配。每种方法都具有不同的优势,我们可以根据应用的具体要求进行定制。

As always, the source code for the examples is available over on GitHub.

与往常一样,这些示例的源代码可在 GitHub 上获取。