Custom Serializers in Apache Kafka – Apache Kafka中的自定义序列化器

最后修改: 2021年 8月 26日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

During the transmission of messages in Apache Kafka, the client and server agree on the use of a common syntactic format. Apache Kafka brings default converters (such as String and Long) but also supports custom serializers for specific use cases. In this tutorial, we’ll see how to implement them.

在Apache Kafka的消息传输过程中,客户端和服务器同意使用一个通用的语法格式。Apache Kafka带来了默认的转换器(如StringLong),但也支持为特定的使用情况定制串行器。在本教程中,我们将看到如何实现它们。

2. Serializers in Apache Kafka

2.Apache Kafka中的序列化器

Serialization is the process of converting objects into bytes. Deserialization is the inverse process — converting a stream of bytes into an object. In a nutshell, it transforms the content into readable and interpretable information.

序列化是将对象转换为字节的过程。反序列化是一个相反的过程–将字节流转换为对象。简而言之,它将内容转换为可读和可解释的信息

As we mentioned, Apache Kafka provides default serializers for several basic types, and it allows us to implement custom serializers:

正如我们所提到的,Apache Kafka为几种基本类型提供了默认的序列化器,并且它允许我们实现自定义的序列化器。

 

The figure above shows the process of sending messages to a Kafka topic through the network. In this process, the custom serializer converts the object into bytes before the producer sends the message to the topic. Similarly, it also shows how the deserializer transforms back the bytes into the object for the consumer to properly process it.

上图显示了通过网络向Kafka主题发送消息的过程。在这个过程中,在生产者将消息发送到主题之前,自定义序列化器将对象转换为字节。同样地,它也显示了反序列化器是如何将字节转回给消费者进行正确处理的。

2.1. Custom Serializers

2.1.自定义序列化器

Apache Kafka provides a pre-built serializer and deserializer for several basic types:

Apache Kafka为几种基本类型提供了一个预建的序列化器和反序列化器。

But it also offers the capability to implement custom (de)serializers. In order to serialize our own objects, we’ll implement the Serializer interface. Similarly, to create a custom deserializer, we’ll implement the Deserializer interface.

但它也提供了实现自定义(去)序列化器的能力。为了序列化我们自己的对象,我们将实现序列化器 接口。同样,为了创建一个自定义的反序列化器,我们将实现反序列化器 接口。

There are there methods available to override for both interfaces:

这两个接口都有可用的方法可以重写。

  • configure: used to implement configuration details
  • serialize/deserialize: These methods include the actual implementation of our custom serialization and deserialization.
  • close: use this method to close the Kafka session

3. Implementing Custom Serializers in Apache Kafka

3.在Apache Kafka中实现自定义序列化器

Apache Kafka provides the capability of customizing the serializers. It’s possible to implement specific converters not only for the message value but also for the key.

Apache Kafka提供了定制序列器的能力。它可以实现特定的转换器,不仅针对消息值,也针对键。

3.1. Dependencies

3.1. 依赖性

To implement the examples, we’ll simply add the Kafka Consumer API dependency to our pom.xml:

为了实现这些示例,我们只需将Kafka Consumer API依赖性添加到我们的pom.xml中。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

3.2. Custom Serializer

3.2.自定义串行器

First, we’ll use Lombok to specify the custom object to send through Kafka:

首先,我们将使用Lombok来指定要通过Kafka发送的自定义对象。

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDto {
    private String message;
    private String version;
}

Next, we’ll implement the Serializer interface provided by Kafka for the producer to send the messages:

接下来,我们将实现Kafka提供的Serializer接口,用于生产者发送消息。

@Slf4j
public class CustomSerializer implements Serializer {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, MessageDto data) {
        try {
            if (data == null){
                System.out.println("Null received at serializing");
                return null;
            }
            System.out.println("Serializing...");
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing MessageDto to byte[]");
        }
    }

    @Override
    public void close() {
    }
}

We’ll override the serialize method of the interface. Therefore, in our implementation, we’ll transform the custom object using a Jackson ObjectMapper. Then we’ll return the stream of bytes to properly send the message to the network.

我们将覆盖该接口的serialize方法。因此,在我们的实现中,我们将使用Jackson ObjectMapper转换自定义对象。然后我们将返回字节流以正确地将消息发送到网络上。

3.3. Custom Deserializer

3.3.自定义反序列化器

In the same way, we’ll implement the Deserializer interface for the consumer:

以同样的方式,我们将为消费者实现Deserializer接口。

@Slf4j
public class CustomDeserializer implements Deserializer<MessageDto> {
    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public MessageDto deserialize(String topic, byte[] data) {
        try {
            if (data == null){
                System.out.println("Null received at deserializing");
                return null;
            }
            System.out.println("Deserializing...");
            return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
        } catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to MessageDto");
        }
    }

    @Override
    public void close() {
    }
}

As in the previous section, we’ll override the deserialize method of the interface. Consequently, we’ll convert the stream of bytes into the custom object using the same Jackson ObjectMapper.

和上一节一样,我们将覆盖接口的deserialize方法。因此,我们将使用相同的Jackson ObjectMapper将字节流转换为自定义对象。

3.4. Consuming an Example Message

3.4.消耗一个示例消息

Let’s see a working example sending and receiving an example message with the custom serializer and deserializer.

让我们看看一个使用自定义序列化器和反序列化器发送和接收消息的工作实例。

Firstly, we’ll create and configure the Kafka Producer:

首先,我们将创建和配置Kafka生产者。

private static KafkaProducer<String, MessageDto> createKafkaProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ProducerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomSerializer");

    return new KafkaProducer(props);
}

We’ll configure the value serializer property with our custom class and the key serializer with the default StringSerializer.

我们将用我们的自定义类配置值序列化器属性,用默认的StringSerializer配置键序列化器。

Secondly, we’ll create the Kafka Consumer:

其次,我们将创建Kafka消费者。

private static KafkaConsumer<String, MessageDto> createKafkaConsumer() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomDeserializer");

    return new KafkaConsumer<>(props);
}

Besides the key and value deserializers with our custom class, it is mandatory to include the group id. Apart from that, we put the auto offset reset config to earliest in order to make sure the producer sent all messages before the consumer starts.

除了用我们的自定义类的键和值反序列化器外,必须包括组的id。除此之外,我们把自动偏移量重置配置为earliest ,以确保生产者在消费者开始之前发送所有消息。

Once we’ve created the producer and consumer clients, it’s time to send an example message:

一旦我们创建了生产者和消费者客户端,现在是时候发送一个示例消息了。

MessageDto msgProd = MessageDto.builder().message("test").version("1.0").build();

KafkaProducer<String, MessageDto> producer = createKafkaProducer();
producer.send(new ProducerRecord<String, MessageDto>(TOPIC, "1", msgProd));
System.out.println("Message sent " + msgProd);
producer.close();

And we can receive the message with the consumer by subscribing to the topic:

而我们可以通过订阅该主题来接收与消费者的信息。

AtomicReference<MessageDto> msgCons = new AtomicReference<>();

KafkaConsumer<String, MessageDto> consumer = createKafkaConsumer();
consumer.subscribe(Arrays.asList(TOPIC));

ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
    msgCons.set(record.value());
    System.out.println("Message received " + record.value());
});

consumer.close();

The result in the console is:

控制台中的结果是。

Serializing...
Message sent MessageDto(message=test, version=1.0)
Deserializing...
Message received MessageDto(message=test, version=1.0)

4. Conclusion

4.总结

In this tutorial, we showed how producers use serializers in Apache Kafka to send messages through the network. In the same way, we also showed how consumers use deserializers to interpret the message received.

在本教程中,我们展示了生产者如何在Apache Kafka中使用序列化器来通过网络发送消息。同样,我们也展示了消费者如何使用反序列化器来解释收到的消息。

Furthermore, we learned the default serializers available and, most importantly, the capability of implementing custom serializers and deserializers.

此外,我们还学习了可用的默认序列化器,最重要的是,实现自定义序列化器和反序列化器的能力。

As always, the code is available over on GitHub.

像往常一样,代码可在GitHub上获得