Send Large Messages With Kafka – 用Kafka发送大型信息

最后修改: 2021年 7月 4日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Apache Kafka is a powerful, open-source, distributed, fault-tolerant event streaming platform. However, when we use Kafka to send messages larger than the configured size limit, it gives an error.

Apache Kafka是一个强大、开源、分布式、容错的事件流平台。然而,当我们使用Kafka发送大于配置的大小限制的消息时,它会给出一个错误。

We showed how to work with Spring and Kafka in a previous tutorial. In this tutorial, we’ll look at the way to send large messages with Kafka.

我们在之前的教程中展示了如何使用 Spring 和 Kafka。在本教程中,我们将探讨用Kafka发送大型消息的方法。

2. Problem Statement

2 问题陈述

Kafka configuration limits the size of messages that it’s allowed to send. By default, this limit is 1MB. However, if there’s a requirement to send large messages, we need to tweak these configurations as per our requirements.

Kafka配置限制了它允许发送的消息的大小。默认情况下,这个限制是1MB。然而,如果有发送大型消息的要求,我们需要根据我们的要求来调整这些配置。

For this tutorial, we’re using Kafka v2.5. Let’s first look into our Kafka setup before jumping to configuration.

在本教程中,我们使用的是Kafka v2.5.在开始配置之前,让我们首先看看我们的Kafka设置。

3. Setup

3.设置

Here, we’re going to use the basic Kafka setup with a single broker. Also, the producer application can send messages over a defined topic to Kafka Broker by using Kafka Client. Additionally, we’re using a single partition topic:

在这里,我们将使用基本的Kafka设置,只有一个经纪人。同时,生产者应用程序可以通过使用Kafka客户端,通过定义的主题向Kafka Broker发送消息。此外,我们使用的是一个单一的分区主题。

We can observe multiple interaction points here like Kafka Producer, Kafka Broker, Topic, and Kafka Consumer. Therefore, all of these need configuration updates to be able to send a large message from one end to another.

我们可以在这里观察到多个交互点,如Kafka Producer、Kafka Broker、Topic和Kafka Consumer。因此,所有这些都需要配置更新,以便能够从一端向另一端发送大型消息

Let’s look into these configs in detail to send a large message of 20MB.

让我们详细了解一下这些配置,以发送20MB的大邮件。

3. Kafka Producer Configuration

3.Kafka生产者配置

This is the first place where our message originates. And we’re using Spring Kafka to send messages from our application to the Kafka server.

这是我们消息的第一个发源地。我们正在使用Spring Kafka将消息从我们的应用程序发送到Kafka服务器上。

Hence, the property “max.request.size” needs to be updated first. Additional details about this producer config are available in Kafka Documentation.  This is available as constant ProducerConfig.MAX_REQUEST_SIZE_CONFIG in the Kafka Client library, which is available as part of Spring Kafka dependency.

因此,属性”max.request.size“需要首先被更新。有关该生产者配置的其他细节可在Kafka文档中找到。 这可以作为Kafka客户端库中的常量ProducerConfig.MAX_REQUEST_SIZE_CONFIG,该库可作为Spring Kafka依赖的一部分。

Let’s configure this value to 20971520 bytes:

让我们把这个值配置为20971520字节。

public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");

    return new DefaultKafkaProducerFactory<>(configProps);
}

4. Kafka Topic Configuration

4.Kafka主题配置

Our message-producing application sends messages to Kafka Broker on a defined Topic. Hence, the next requirement is to configure the used Kafka Topic. This means we need to update the “max.message.bytes” property having a default value of 1MB.

我们的消息生产应用程序在定义的Topic上向Kafka Broker发送消息。因此,下一个要求是配置使用的Kafka Topic。这意味着我们需要更新”max.message.bytes“属性,其默认值为1MB。

This holds the value of Kafka’s largest record batch size after compression (if compression is enabled). Additional details are available in Kafka Documentation.

这持有Kafka压缩后的最大记录批量大小的值(如果启用了压缩)。更多细节可在Kafka文档中找到。

Let’s configure this property manually at the time of topic creation using the CLI command:

让我们在创建主题时使用CLI命令手动配置这个属性。

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic longMessage --partitions 1 \
--replication-factor 1 --config max.message.bytes=20971520 

Alternatively, we can configure this property through Kafka Client:

另外,我们也可以通过Kafka客户端配置这个属性。

public NewTopic topic() {
    NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);
    Map<String, String> configs = new HashMap<>();
    configs.put("max.message.bytes", "20971520");
    newTopic.configs(configs);
    return newTopic;
}

At a minimum, we need to configure these two properties.

至少,我们需要配置这两个属性。

5. Kafka Broker Configuration

5.Kafka Broker配置

An optional configuration property, “message.max.bytes“, can be used to allow all topics on a Broker to accept messages of greater than 1MB in size.

一个可选的配置属性”message.max.bytes“,可以用来允许Broker上的所有主题接受大小超过1MB的消息。

And this holds the value of the largest record batch size allowed by Kafka after compression (if compression is enabled). Additional details are available in Kafka Documentation.

而这持有Kafka在压缩后允许的最大记录批次大小的值(如果启用了压缩)。更多细节可在Kafka文档中找到。

Let’s add this property in Kafka Broker’s “server.properties” config file:

让我们在Kafka Broker的”server.properties”/em>配置文件中添加这个属性。

message.max.bytes=20971520

Moreover, the maximum value among “message.max.bytes” and “max.message.bytes” will be the effective value used.

此外,”message.max.bytes”和”max.message.bytes“中的最大值将是使用的有效值。

6. Consumer Configuration

6.消费者配置

Let’s look into the configuration settings available for a Kafka consumer. Although these changes aren’t mandatory for consuming large messages, avoiding them can have a performance impact on the consumer application. Hence, it’s good to have these configs in place, too:

让我们来看看Kafka消费者可用的配置设置。尽管这些变化对于消费大的消息来说不是强制性的,但避免这些变化会对消费者应用程序的性能产生影响。因此,有这些配置也是好事。

  • max.partition.fetch.bytes: This property limits the number of bytes a consumer can fetch from a Topic’s partition. Additional details are available in Kafka Documentation. This is available as a constant named ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG in the Kafka Client library
  • fetch.max.bytes: This property limits the number of bytes a consumer can fetch from the Kafka server itself. A Kafka consumer can listen on multiple partitions as well. Additional details are available in Kafka Documentation. This is available as constant ConsumerConfig.FETCH_MAX_BYTES_CONFIG in the Kafka Client library

Therefore, to configure our consumers, we need to create a KafkaConsumerFactory. Remember we always need to use a higher value compared to Topic/Broker config:

因此,为了配置我们的消费者,我们需要创建一个KafkaConsumerFactory。记住,与Topic/Broker配置相比,我们总是需要使用一个更高的值。

public ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
    props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
    return new DefaultKafkaConsumerFactory<>(props);
}

Here we used the same config value of 20971520 Bytes for both properties because we are using a single partition Topic. However, the value of FETCH_MAX_BYTES_CONFIG should be higher than MAX_PARTITION_FETCH_BYTES_CONFIG. When we have consumer listening on multiple partitions, FETCH_MAX_BYTES_CONFIG represents the message size that can be fetched from multiple partitions. On the other hand, config MAX_PARTITION_FETCH_BYTES_CONFIG represents message fetch size from a single partition.

在这里,我们为两个属性使用了相同的配置值20971520字节,因为我们使用的是单一分区Topic。然而,FETCH_MAX_BYTES_CONFIG的值应该高于MAX_PARTITION_FETCH_BYTES_CONFIG.当我们有消费者在多个分区上监听时,FETCH_MAX_BYTES_CONFIG代表可以从多个分区获取的消息大小。另一方面,配置MAX_PARTITION_FETCH_BYTES_CONFIG代表从单一分区获取的消息大小。

7. Alternatives

7.替代品

We saw how different configs in Kafka producer, Topic, Broker, and Kafka consumer could be updated to send large messages. However, we should generally avoid sending large messages using Kafka. The processing of large messages consumes more CPU and memory of our producer and consumer. Hence ultimately somewhat limits their processing capabilities for other tasks. Also, this can cause visibly high latency to the end-user.

我们看到了如何更新Kafka生产者、Topic、Broker和Kafka消费者中的不同配置来发送大型消息。然而,一般来说,我们应该避免使用Kafka发送大型消息。大型消息的处理会消耗我们的生产者和消费者更多的CPU和内存。因此最终在某种程度上限制了他们处理其他任务的能力。同时,这也会给终端用户带来明显的高延迟。

Let’s look into other possible options:

让我们研究一下其他可能的选择。

  1. Kafka producer provides a feature to compress messages. Additionally, it supports different compression types that we can configure using the compression.type property.
  2. We can store the large messages in a file at the shared storage location and send the location through Kafka message. This can be a faster option and has minimum processing overhead.
  3. Another option could be to split the large message into small messages of size 1KB each at the producer end. After that, we can send all these messages to a single partition using the partition key to ensure the correct order. Therefore, later, at the consumer end, we can reconstruct the large message from smaller messages.

If none of the above options suits our requirements, we can go for the earlier discussed configurations.

如果上述选项都不适合我们的要求,我们可以选择前面讨论的配置。

8. Conclusion

8.结语

In this article, we covered different Kafka configurations required to send large messages greater than 1MB in size.

在这篇文章中,我们介绍了发送大于1MB大小的大型消息所需的不同Kafka配置。

We covered configs needs at the Producer, Topic, on Broker, and Consumer end. However, some of these are mandatory configs, while some are optional. Additionally, consumer configs are optional but good to have to avoid negative performance impacts.

我们涵盖了生产者、主题、经纪人和消费者终端的配置需求。然而,其中有些是强制性的配置,而有些是可选的。此外,消费者的配置是可选的,但最好能有,以避免对性能的负面影响。

In the end, we also covered alternate possible options for sending large messages.

最后,我们还涉及了发送大型信息的其他可能选择。

As always, the code example is available over on GitHub.

一如既往,代码示例在可获得在GitHub上