Testing Kafka and Spring Boot – 测试Kafka和Spring Boot

最后修改: 2020年 11月 16日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Apache Kafka is a powerful, distributed, fault-tolerant stream processing system. In a previous tutorial, we learned how to work with Spring and Kafka.

Apache Kafka是一个功能强大、分布式、容错的流处理系统。在之前的教程中,我们学习了如何使用 Spring 和 Kafka

In this tutorial, we’ll build on the previous one and learn how to write reliable, self-contained integration tests that don’t rely on an external Kafka server running.

在本教程中,我们将在前一个教程的基础上,学习如何编写可靠、独立的集成测试,而不依赖于外部Kafka服务器的运行。

First, we’ll start by looking at how to use and configure an embedded instance of Kafka.

首先,我们先来看看如何使用和配置Kafka的嵌入式实例。

Then we’ll see how we can make use of the popular framework Testcontainers from our tests.

然后我们将看到我们如何利用流行的框架Testcontainers来自我们测试。

2. Dependencies

2.依赖性

Of course, we’ll need to add the standard spring-kafka dependency to our pom.xml:

当然,我们需要将标准的spring-kafka依赖性添加到我们的pom.xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

Then we’ll need two more dependencies specifically for our tests.

那么我们还需要两个专门用于测试的依赖项。

First, we’ll add the spring-kafka-test artifact:

首先,我们将添加spring-kafka-test构件

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.6.3.RELEASE</version>
    <scope>test</scope>
</dependency>

And finally we’ll add the Testcontainers Kafka dependency, which is also available over on Maven Central:

最后,我们将添加Testcontainers Kafka依赖项,该依赖项在Maven Central上也有。

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.15.3</version>
    <scope>test</scope>
</dependency>

Now that we have all the necessary dependencies configured, we can write a simple Spring Boot application using Kafka.

现在我们已经配置了所有必要的依赖,我们可以使用Kafka编写一个简单的Spring Boot应用程序。

3. A Simple Kafka Producer-Consumer Application

3.一个简单的Kafka生产者-消费者应用

Throughout this tutorial, the focus of our tests will be a simple producer-consumer Spring Boot Kafka application.

在本教程中,我们测试的重点将是一个简单的生产者-消费者Spring Boot Kafka应用程序。

Let’s start by defining our application entry point:

让我们从定义我们的应用程序入口点开始。

@SpringBootApplication
public class KafkaProducerConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerConsumerApplication.class, args);
    }
}

As we can see, this is a standard Spring Boot application.

我们可以看到,这是一个标准的Spring Boot应用程序。

3.1. Producer Setup

3.1.生产者设置

Next, let’s consider a producer bean that we’ll use to send messages to a given Kafka topic:

接下来,让我们考虑一个生产者Bean,我们将用它来发送消息到一个特定的Kafka主题。

@Component
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}

Our KafkaProducer bean defined above is merely a wrapper around the KafkaTemplate class. This class provides high-level thread-safe operations, such as sending data to the provided topic, which is exactly what we do in our send method.

我们上面定义的KafkaProducer Bean只是KafkaTemplate类的一个封装器。该类提供了高层次的线程安全操作,例如向提供的主题发送数据,这正是我们在send方法中所做的。

3.2. Consumer Setup

3.2.消费者设置

Likewise, we’ll now define a simple consumer bean that will listen to a Kafka topic and receive messages:

同样地,我们现在将定义一个简单的消费者Bean,它将监听Kafka主题并接收消息。

@Component
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        payload = consumerRecord.toString();
        latch.countDown();
    }

    public void resetLatch() {
        latch = new CountDownLatch(1);
    }

    // other getters
}

Our simple consumer uses the @KafkaListener annotation on the receive method to listen to messages on a given topic. We’ll see later how we configure the test.topic from our tests.

我们的简单消费者在receive方法上使用@KafkaListener注解来监听指定主题上的消息。我们稍后会看到我们如何从我们的测试中配置test.topic

Furthermore, the receive method stores the message content in our bean and decrements the count of the latch variable. This variable is a simple thread-safe counter field that we’ll use later from our tests to ensure we successfully received a message.

此外,接收方法将消息内容存储在我们的Bean中,并递减latch变量的计数。该变量是一个简单的线程安全的计数器字段,我们将在以后的测试中使用它来确保我们成功地接收了一条消息。

Now that we have our simple Kafka application using Spring Boot implemented, let’s see how we can write integration tests.

现在我们已经实现了使用Spring Boot的简单Kafka应用,让我们看看如何编写集成测试。

4. A Word on Testing

4.关于测试的说法

In general, when writing clean integration tests, we shouldn’t depend on external services that we might not be able to control or might suddenly stop working. This could have adverse effects on our test results.

一般来说,在编写干净的集成测试时,我们不应该依赖我们可能无法控制或可能突然停止工作的外部服务。这可能对我们的测试结果产生不利影响。

Similarly, if we’re dependent on an external service, in this case, a running Kafka broker, we likely won’t be able to set it up, control it and tear it down in the way we want from our tests.

同样,如果我们依赖于一个外部服务,在这种情况下,一个正在运行的Kafka代理,我们很可能无法从我们的测试中以我们想要的方式设置它、控制它和拆毁它。

4.1. Application Properties

4.1.应用属性

We’re going to use a very light set of application configuration properties from our tests.

我们将使用我们测试中的一套非常轻的应用程序配置属性。

We’ll define these properties in our src/test/resources/application.yml file:

我们将在我们的src/test/resources/application.yml文件中定义这些属性。

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: baeldung
test:
  topic: embedded-test-topic

This is the minimum set of properties that we need when working with an embedded instance of Kafka or a local broker.

这是我们在使用Kafka的嵌入式实例或本地代理时需要的最小属性集。

Most of these are self-explanatory, but the one we should highlight is the consumer property auto-offset-reset: earliest. This property ensures that our consumer group gets the messages we send because the container might start after the sends have completed.

其中大多数都是不言自明的,但我们应该强调的是消费者属性auto-offset-reset: earliest该属性确保我们的消费者组能够得到我们发送的消息,因为容器可能在发送完成后才开始。

Additionally, we configure a topic property with the value embedded-test-topic, which is the topic we’ll use from our tests.

此外,我们配置一个主题属性,其值为embedded-test-topic,这是我们将在测试中使用的主题。

5. Testing Using Embedded Kafka

5.使用嵌入式Kafka进行测试

In this section, we’ll take a look at how to use an in-memory Kafka instance to run our tests against. This is also known as Embedded Kafka.

在本节中,我们将看看如何使用内存中的Kafka实例来运行我们的测试。这也被称为嵌入式Kafka。

The dependency spring-kafka-test we added previously contains some useful utilities to assist with testing our application. Most notably, it contains the EmbeddedKafkaBroker class.

我们之前添加的依赖关系spring-kafka-test包含一些有用的实用程序,以协助测试我们的应用程序。最值得注意的是,它包含了EmbeddedKafkaBroker类。

With that in mind, let’s go ahead and write our first integration test:

考虑到这一点,让我们继续写我们的第一个集成测试。

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() 
      throws Exception {
        String data = "Sending with our own simple KafkaProducer";
        
        producer.send(topic, data);
        
        boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
        assertTrue(messageConsumed);
        assertThat(consumer.getPayload(), containsString(data));
    }
}

Let’s walk through the key parts of our test.

让我们走过我们测试的关键部分。

First, we start by decorating our test class with two pretty standard Spring annotations:

首先,我们先用两个相当标准的Spring注解来装饰我们的测试类。

  • The @SpringBootTest annotation will ensure that our test bootstraps the Spring application context.
  • We also use the @DirtiesContext annotation, which will make sure this context is cleaned and reset between different tests.

Here comes the crucial part — we use the @EmbeddedKafka annotation to inject an instance of an EmbeddedKafkaBroker into our tests.

关键部分来了–我们使用@EmbeddedKafkaannotation将EmbeddedKafkaBroker的一个实例注入我们的测试中。

Moreover, there are several properties available we can use to configure the embedded Kafka node:

此外,有几个属性可用,我们可以用来配置嵌入式Kafka节点。

  • partitions – This is the number of partitions used per topic. To keep things nice and simple, we only want one to be used from our tests.
  • brokerProperties – additional properties for the Kafka broker. Again, we keep things simple and specify a plain text listener and a port number.

Next, we auto-wire our consumer and producer classes and configure a topic to use the value from our application.properties.

接下来,我们自动连接我们的consumerproducer类,并配置一个主题来使用我们的application.properties中的值。

For the final piece of the puzzle, we simply send a message to our test topic and verify that the message has been received and contains the name of our test topic.

对于拼图的最后一块,我们只需向我们的测试主题发送一条消息,并验证该消息是否被收到,并包含我们的测试主题的名称。

When we run our test, here’s what we’ll see among the verbose Spring output:

当我们运行我们的测试时,我们将在Spring的冗长输出中看到以下内容。

...
12:45:35.099 [main] INFO  c.b.kafka.embedded.KafkaProducer -
  sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
  INFO  c.b.kafka.embedded.KafkaConsumer - received payload=
  'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
  CreateTime = 1605267935099, serialized key size = -1, 
  serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
  key = null, value = Sending with our own simple KafkaProducer key)'

This confirms that our test is working properly. Awesome! We now have a way to write self-contained, independent integration tests using an in-memory Kafka broker.

这证实了我们的测试正在正常工作。棒极了我们现在有了一种方法,可以使用内存中的Kafka代理编写自成一体、独立的集成测试。

6. Testing Kafka With TestContainers

6.使用TestContainers测试Kafka

Sometimes we might see small differences between a real external service vs an embedded in-memory instance of a service that has been specifically provided for testing purposes. Although unlikely, it could also be that the port used from our test might be occupied, causing a failure.

有时,我们可能会看到真正的外部服务与专门为测试目的提供的服务的嵌入式内存实例之间的微小差异。虽然不太可能,但也可能是我们测试中使用的端口被占用了,导致失败。

With that in mind, in this section we’ll see a variation on our previous approach to testing using the Testcontainers framework. We’ll see how to instantiate and manage an external Apache Kafka broker hosted inside a Docker container from our integration test.

考虑到这一点,在本节中,我们将看到使用Testcontainers框架进行测试的前一种方法的变化。我们将看到如何从我们的集成测试中实例化和管理托管在Docker 容器内的外部 Apache Kafka 代理。

Let’s define another integration test that will be quite similar to the one we saw in the previous section:

让我们定义另一个集成测试,它将与我们在上一节看到的集成测试非常相似。

@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {

    @ClassRule
    public static KafkaContainer kafka = 
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenKafkaDockerContainer_whenSendingWithSimpleProducer_thenMessageReceived() 
      throws Exception {
        String data = "Sending with our own simple KafkaProducer";
        
        producer.send(topic, data);
     
        boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
        
        assertTrue(messageConsumed);
        assertThat(consumer.getPayload(), containsString(data));
    }
}

Let’s take a look at the differences. We’re declaring the kafka field, which is a standard JUnit @ClassRule. This field is an instance of the KafkaContainer class that will prepare and manage the life cycle of our container running Kafka.

让我们来看看其中的区别。我们正在声明kafka字段,它是一个标准的JUnit @ClassRule这个字段是KafkaContainer类的一个实例,它将准备和管理我们运行Kafka的容器的生命周期。

To avoid port clashes, Testcontainers allocates a port number dynamically when our docker container starts.

为了避免端口冲突,Testcontainers在我们的docker容器启动时动态分配了一个端口号。

For this reason, we provide a custom consumer and producer factory configuration using the class KafkaTestContainersConfiguration:

出于这个原因,我们使用KafkaTestContainersConfiguration类提供了一个自定义的消费者和生产者工厂配置。

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
    // more standard configuration
    return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    // more standard configuration
    return new DefaultKafkaProducerFactory<>(configProps);
}

We then reference this configuration via the @Import annotation at the beginning of our test.

然后我们在测试开始时通过@Import注解引用这个配置。

The reason for this is that we need a way to inject the server address into our application, which as previously mentioned is generated dynamically.

原因是我们需要一种方法将服务器地址注入到我们的应用程序中,正如前面提到的,它是动态生成的。

We achieve this by calling the getBootstrapServers() method, which will return the bootstrap server location:

我们通过调用getBootstrapServers()方法实现这一目标,该方法将返回bootstrap服务器的位置

bootstrap.servers = [PLAINTEXT://localhost:32789]

Now when we run our test, we should see that Testcontainers does several things:

现在当我们运行我们的测试时,我们应该看到Testcontainers做了几件事。

  • Checks our local Docker setup
  • Pulls the confluentinc/cp-kafka:5.4.3 docker image if necessary
  • Starts a new container and waits for it to be ready
  • Finally, shuts down and deletes the container after our test finishes

Again, this is confirmed by inspecting the test output:

同样,这一点可以通过检查测试输出来确认。

13:33:10.396 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Presto! A working integration test using a Kafka docker container.

Presto!一个使用Kafka docker容器的有效集成测试。

7. Conclusion

7.结语

In this article, we learned about a couple of approaches for testing Kafka applications with Spring Boot.

在这篇文章中,我们了解了用Spring Boot测试Kafka应用程序的几种方法。

In the first approach, we saw how to configure and use a local in-memory Kafka broker.

在第一种方法中,我们看到了如何配置和使用一个本地内存Kafka代理。

Then we saw how to use Testcontainers to set up an external Kafka broker running inside a docker container from our tests.

然后我们看到了如何使用Testcontainers来设置一个外部的Kafka代理,从我们的测试中运行在一个docker容器内。

As always, the full source code of the article is available over on GitHub.

一如既往,文章的完整源代码可在GitHub上获得