1. Overview
1.概述
In this article, we’ll learn about Spring-Kafka‘s RecordDeserializationException. After that, we’ll create a custom error handler to catch this exception and skip the invalid message, allowing the consumer to continue processing the next events.
在本文中,我们将了解 Spring-Kafka 的 RecordDeserializationException 异常。之后,我们将创建一个自定义错误处理程序来捕获此异常并跳过无效消息,从而允许消费者继续处理下一个事件。
This article relies on Spring Boot’s Kafka modules, which offer convenient tools for interacting with the broker. For a deeper grasp of Kafka internals, we can revisit the fundamental concepts of the platform.
本文依赖 Spring Boot 的 Kafka 模块,这些模块提供了与代理交互的便捷工具。 要深入了解 Kafka 的内部结构,我们可以重温该平台的基本概念。
2. Creating a Kafka Listener
2.创建 Kafka 监听器
For the code examples in this article, we’ll use a small application that listens to the topic “baeldung.articles.published” and processes the incoming messages. To showcase the custom error handling, our application should continue consuming messages after encountering deserialization exceptions.
在本文的代码示例中,我们将使用一个监听主题”baeldung.articles.published”并处理传入消息的小型应用程序。为了展示自定义错误处理,我们的应用程序应在遇到反序列化异常后继续消费消息。
Spring-Kafka’s version will be resolved automatically by the parent Spring Boot pom. Therefore, we simply need to add the module dependency:
Spring-Kafka 的版本将由 父 Spring Boot pom 自动解析。因此,我们只需添加模块依赖关系即可:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
This module enables us to use the @KafkaListener annotation, an abstraction over Kafka’s Consumer API. Let’s leverage this annotation to create the ArticlesPublishedListener component. Additionally, we’ll introduce another component, EmailService, that will perform an action for each of the incoming messages:
该模块使我们能够使用 @KafkaListener 注解,它是对 Kafka 的 Consumer API 的抽象。让我们利用该注解来创建 ArticlesPublishedListener 组件。此外,我们还将引入另一个组件 EmailService,它将为每一条传入的消息执行一项操作:
@Component
class ArticlesPublishedListener {
private final EmailService emailService;
// constructor
@KafkaListener(topics = "baeldung.articles.published")
public void onArticlePublished(ArticlePublishedEvent event) {
emailService.sendNewsletter(event.article());
}
}
record ArticlePublishedEvent(String article) {
}
For the consumer configuration, we’ll focus on defining only the properties crucial to our example. When we’re developing a production application, we can adjust these properties to suit our particular needs or externalize them to a separate configuration file:
对于消费者配置,我们将只专注于定义对我们的示例至关重要的属性。在开发生产应用程序时,我们可以调整这些属性以满足我们的特定需求,或者将它们外部化到单独的配置文件中:
@Bean
ConsumerFactory<String, ArticlePublishedEvent> consumerFactory(
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers
) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung-app-1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(ArticlePublishedEvent.class)
);
}
@Bean
KafkaListenerContainerFactory<String, ArticlePublishedEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, ArticlePublishedEvent> consumerFactory
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
3. Setting up the Test Environment
3.设置测试环境
To set up our testing environment, we can leverage a Kafka Testcontainer that will seamlessly spin up a Kafka Docker container for testing:
要建立测试环境,我们可以利用 Kafka 测试容器,它将无缝启动一个 Kafka Docker 容器进行测试: <br
@Testcontainers
@SpringBootTest(classes = Application.class)
class DeserializationExceptionLiveTest {
@Container
private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@DynamicPropertySource
static void setProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
}
// ...
}
Alongside this, we’ll need a KafkaProducer and an EmailService to validate our listener’s functionality. These components will send messages to our listener and verify their accurate processing. To simplify the tests and avoid mocking, let’s persist all incoming articles in a list, in memory, and later access them using a getter:
除此之外,我们还需要一个 KafkaProducer 和一个 EmailService 来验证监听器的功能。这些组件将向我们的监听器发送消息并验证消息的准确处理。为了简化测试并避免模拟,让我们将所有传入的文章持久化为内存中的一个列表,然后使用 getter 访问它们:
@Service
class EmailService {
private final List<String> articles = new ArrayList<>();
// logger, getter
public void sendNewsletter(String article) {
log.info("Sending newsletter for article: " + article);
articles.add(article);
}
}
As a result, we simply need to inject the EmailService into our test class. Let’s continue by creating testKafkaProducer:
因此,我们只需将 EmailService 注入到我们的测试类中即可。让我们继续创建 testKafkaProducer:
@Autowired
EmailService emailService;
static KafkaProducer<String, String> testKafkaProducer;
@BeforeAll
static void beforeAll() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
testKafkaProducer = new KafkaProducer<>(props);
}
With this setup, we can already test the happy flow. Let’s publish two articles with a valid JSON and verify that our application successfully called the emailService for each of them:
有了这些设置,我们就可以测试快乐流程了。让我们用有效的 JSON 发布两篇文章,并验证我们的应用程序是否成功调用了每篇文章的 emailService :
@Test
void whenPublishingValidArticleEvent_thenProcessWithoutErrors() {
publishArticle("{ \"article\": \"Kotlin for Java Developers\" }");
publishArticle("{ \"article\": \"The S.O.L.I.D. Principles\" }");
await().untilAsserted(() ->
assertThat(emailService.getArticles())
.containsExactlyInAnyOrder(
"Kotlin for Java Developers",
"The S.O.L.I.D. Principles"
));
}
4. Causing a RecordDeserializationException
4.导致 RecordDeserializationException 异常</em
Kafka throws RecordDeserializationException if the configured deserializer cannot properly parse the key or value of the message. To reproduce this error, we simply need to publish a message containing an invalid JSON body:
如果配置的反序列化器无法正确解析消息的键或值,Kafka 将抛出RecordDeserializationException。要重现此错误,我们只需发布一条包含无效 JSON 主体的消息:
@Test
void whenPublishingInvalidArticleEvent_thenCatchExceptionAndContinueProcessing() {
publishArticle("{ \"article\": \"Introduction to Kafka\" }");
publishArticle(" !! Invalid JSON !! ");
publishArticle("{ \"article\": \"Kafka Streams Tutorial\" }");
await().untilAsserted(() ->
assertThat(emailService.getArticles())
.containsExactlyInAnyOrder(
"Kotlin for Java Developers",
"The S.O.L.I.D. Principles"
));
}
If we run this test and check the console, we’ll observe a recurring error being logged:
如果我们运行该测试并检查控制台,就会发现记录了一个重复出现的错误:
ERROR 7716 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Consumer exception
**java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer**
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:151) ~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1815) ~[spring-kafka-2.8.11.jar:2.8.11]
...
**Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition baeldung.articles.published-0 at offset 1. If needed, please seek past the record to continue consumption.**
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1448) ~[kafka-clients-3.1.2.jar:na]
...
**Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data** [[32, 33, 33, 32, 73, 110, 118, 97, 108, 105, 100, 32, 74, 83, 79, 78, 32, 33, 33, 32]] from topic [baeldung.articles.published]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:588) ~[spring-kafka-2.8.11.jar:2.8.11]
...
**Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('!' (code 33))**: expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
**at [Source: (byte[])" !! Invalid JSON !! "; line: 1, column: 3]**
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) ~[jackson-core-2.13.5.jar:2.13.5]
...
Then, the test will eventually time out and fail. If we check the assertion error, we’ll notice that only the first message was successfully processed:
然后,测试最终会超时并失败。如果我们检查断言错误,就会发现只有第一条信息被成功处理:
org.awaitility.core.ConditionTimeoutException: Assertion condition
Expecting actual:
["Introduction to Kafka"]
to contain exactly in any order:
["Introduction to Kafka", "Kafka Streams Tutorial"]
but could not find the following elements:
["Kafka Streams Tutorial"]
within 5 seconds.
As expected, the deserialization failed for the second message. Consequently, the listener continued attempting to consume the same message, leading to the repeated occurrence of the error.
不出所料,第二个报文的反序列化失败了。因此,监听器继续尝试读取同一信息,导致错误重复发生。
5. Creating an Error Handler
5.创建错误处理程序
If we carefully analyze the failure logs, we’ll notice two suggestions:
如果仔细分析故障日志,我们会发现两个建议:
- consider configuring an ‘ErrorHandlingDeserializer‘;
- if needed, please seek past the record to continue consumption;
In other words, we can create a custom error handler that will handle the deserialization exception and increase the consumer offset. This will allow us to skip the invalid message and proceed with the consumption.
换句话说,我们可以创建一个自定义错误处理程序来处理反序列化异常并增加消费者偏移量。这将允许我们跳过无效消息,继续消费。
5.1. Implementing CommonErrorHandler
5.1.实现 CommonErrorHandler
To implement the CommonErrorHandler interface, we’ll have to override two public methods without a default implementation:
要实现CommonErrorHandler接口,我们必须覆盖两个没有默认实现的公共方法:
- handleOne() – called to handle a single failed record;
- handleOtherException() – called when an exception is thrown, but not for a particular record;
We can handle both cases using a similar approach. Let’s start by catching the exception and logging an error message:
我们可以用类似的方法处理这两种情况。我们先捕获异常并记录错误信息:
class KafkaErrorHandler implements CommonErrorHandler {
@Override
public void handleOne(Exception exception, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
handle(exception, consumer);
}
@Override
public void handleOtherException(Exception exception, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
handle(exception, consumer);
}
private void handle(Exception exception, Consumer<?, ?> consumer) {
log.error("Exception thrown", exception);
// ...
}
}
5.2. Kafka Consumer’s seek() and commitSync()
5.2.Kafka 消费者的 seek() 和 commitSync()
We can use the seek() method from the Consumer interface to manually change the current offset position for a particular partition within a topic. Simply put, we can use it to reprocess or skip messages as needed based on their offsets.
我们可以使用 Consumer 接口中的 seek() 方法手动更改主题中特定分区的当前偏移位置。简单地说,我们可以使用它来根据偏移量重新处理或跳过所需的消息。
In our case, if the exception is an instance of RecordDeserializationException, we’ll call the seek() method with the topic partition and the next offset:
在我们的例子中,如果异常是 RecordDeserializationException 的实例,我们将调用 seek() 方法,并使用主题分区和下一个偏移量:
void handle(Exception exception, Consumer<?, ?> consumer) {
log.error("Exception thrown", exception);
if (exception instanceof RecordDeserializationException ex) {
consumer.seek(ex.topicPartition(), ex.offset() + 1L);
consumer.commitSync();
} else {
log.error("Exception not handled", exception);
}
}
As we can notice, we need to call the commitSync() from the Consumer interface. This will commit the offset and ensure that the new position is acknowledged and persisted by the Kafka broker. This step is crucial, as it updates the offset committed by the consumer group, indicating that messages up to the adjusted position have been successfully processed.
正如我们所注意到的,我们需要从 Consumer 接口调用 commitSync() 。这将提交偏移量,并确保新位置得到 Kafka 代理的确认和持久化。这一步至关重要,因为它更新了消费者组提交的偏移量,表明截至调整位置的消息已被成功处理。
5.3. Updating the Configuration
5.3.更新配置
Finally, we need to add the custom error handler to our consumer configuration. Let’s start by declaring it as a @Bean:
最后,我们需要将自定义错误处理程序添加到消费者配置中。首先,让我们将其声明为 @Bean :
@Bean
CommonErrorHandler commonErrorHandler() {
return new KafkaErrorHandler();
}
After that, we’ll add the new bean to the ConcurrentKafkaListenerContainerFactory using its dedicated setter:
之后,我们将使用 ConcurrentKafkaListenerContainerFactory 的专用设置器将新 bean 添加到 ConcurrentKafkaListenerContainerFactory 中:
@Bean
ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, ArticlePublishedEvent> consumerFactory,
CommonErrorHandler commonErrorHandler
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(commonErrorHandler);
return factory;
}
That’s it! We can re-run the tests now and expect the listener to skip the invalid message and continue consuming messages.
就是这样!我们现在可以重新运行测试,并期待监听器跳过无效消息,继续接收消息。
6. Conclusion
6.结论
In this article, we discussed Spring Kafka’s RecordDeserializationException and we discovered that, if not handled correctly, it can block the consumer group for the given partition.
在本文中,我们讨论了 Spring Kafka 的 RecordDeserializationException ,并发现如果处理不当,可能会阻塞给定分区的消费者组。
Following that, we delved into Kafka’s CommonErrorHandler interface and implemented it to enable our listener to handle deserialization failures while continuing to process messages. We leveraged the Consumer’s API methods, namely seek() and commitSync(), to bypass invalid messages by adjusting the consumer offset accordingly.
随后,我们深入研究了 Kafka 的 CommonErrorHandler 接口并实现了它,使我们的监听器能够在继续处理消息的同时处理反序列化失败。我们利用消费者的 API 方法(即 seek() 和 commitSync(), ),通过相应调整消费者偏移量来绕过无效消息。
As always, the source code for this article is available over on GitHub.
与往常一样,本文的源代码可在 GitHub 上获取。