Using Kafka MockConsumer – 使用Kafka MockConsumer

最后修改: 2020年 5月 25日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll explore the MockConsumer, one of Kafka‘s Consumer implementations.

在本教程中,我们将探讨MockConsumer,这是KafkaConsumer实现之一。

First, we’ll discuss what are the main things to be considered when testing a Kafka Consumer. Then, we’ll see how we can use MockConsumer to implement tests.

首先,我们将讨论在测试Kafka Consumer时需要考虑的主要事项是什么。然后,我们将看到如何使用MockConsumer来实现测试。

2. Testing a Kafka Consumer

2.测试一个Kafka Consumer

Consuming data from Kafka consists of two main steps. Firstly, we have to subscribe to topics or assign topic partitions manually. Secondly, we poll batches of records using the poll method.

从Kafka获取数据包括两个主要步骤。首先,我们必须订阅主题或手动分配主题分区。其次,我们使用poll方法轮询成批的记录。

The polling is usually done in an infinite loop. That’s because we typically want to consume data continuously.

轮询通常是在一个无限的循环中进行的。这是因为我们通常希望连续地消耗数据。

For example, let’s consider the simple consuming logic consisting of just the subscription and the polling loop:

例如,让我们考虑一下仅由订阅和轮询循环组成的简单消费逻辑。

void consume() {
    try {
        consumer.subscribe(Arrays.asList("foo", "bar"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            records.forEach(record -> processRecord(record));
        }
    } catch (WakeupException ex) {
        // ignore for shutdown
    } catch (RuntimeException ex) {
        // exception handling
    } finally {
        consumer.close();
    }
}

Looking at the code above, we can see that there are a few things we can test:

看一下上面的代码,我们可以看到有几件事我们可以测试。

  • the subscription
  • the polling loop
  • the exception handling
  • if the Consumer was closed correctly

We have multiple options to test the consuming logic.

我们有多种选择来测试消费逻辑。

We can use an in-memory Kafka instance. But, this approach has some disadvantages. In general, an in-memory Kafka instance makes tests very heavy and slow. Moreover, setting it up is not a simple task and can lead to unstable tests.

我们可以使用内存中的Kafka实例。但是,这种方法有一些缺点。一般来说,内存中的Kafka实例会使测试变得非常沉重和缓慢。此外,设置它不是一个简单的任务,可能会导致不稳定的测试。

Alternatively, we can use a mocking framework to mock the Consumer. Although using this approach makes tests lightweight, setting it up can be somewhat tricky.

另外,我们可以使用一个嘲弄框架来嘲弄Consumer。尽管使用这种方法可以使测试变得轻量级,但设置它可能有些麻烦。

The final option, and perhaps the best, is to use the MockConsumer, which is a Consumer implementation meant for testing. Not only does it help us to build lightweight tests, but it’s also easy to set up.

最后一个选择,也许是最好的,是使用MockConsumer,它是一个用于测试的Consumer实现。它不仅能帮助我们建立轻量级的测试,而且还很容易设置

Let’s have a look at the features it provides.

让我们来看看它所提供的功能。

3. Using MockConsumer

3.使用MockConsumer

MockConsumer implements the Consumer interface that the kafka-clients library provides. Therefore, it mocks the entire behavior of a real Consumer without us needing to write a lot of code.

MockConsumer实现了Consumer接口,该接口由kafka-clients库提供它模拟了真正的Consumer的整个行为,而我们不需要编写大量的代码。

Let’s look at some usage examples of the MockConsumer. In particular, we’ll take a few common scenarios that we may come across while testing a consumer application, and implement them using the MockConsumer.

让我们来看看MockConsumer的一些使用例子。特别是,我们将采取一些在测试消费者应用程序时可能遇到的常见场景,并使用MockConsumer实现它们。

For our example, let’s consider an application that consumes country population updates from a Kafka topic. The updates contain only the name of the country and its current population:

对于我们的例子,让我们考虑一个从Kafka主题消费国家人口更新的应用程序。这些更新只包含国家的名称和它的当前人口。

class CountryPopulation {

    private String country;
    private Integer population;

    // standard constructor, getters and setters
}

Our consumer just polls for updates using a Kafka Consumer instance, processes them, and at the end, commits the offset using the commitSync method:

我们的消费者只是使用Kafka Consumer 实例来轮询更新,处理它们,并在最后使用commitSync 方法提交偏移。

public class CountryPopulationConsumer {

    private Consumer<String, Integer> consumer;
    private java.util.function.Consumer<Throwable> exceptionConsumer;
    private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;

    // standard constructor

    void startBySubscribing(String topic) {
        consume(() -> consumer.subscribe(Collections.singleton(topic)));
    }

    void startByAssigning(String topic, int partition) {
        consume(() -> consumer.assign(Collections.singleton(new TopicPartition(topic, partition))));
    }

    private void consume(Runnable beforePollingTask) {
        try {
            beforePollingTask.run();
            while (true) {
                ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(1000));
                StreamSupport.stream(records.spliterator(), false)
                    .map(record -> new CountryPopulation(record.key(), record.value()))
                    .forEach(countryPopulationConsumer);
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            System.out.println("Shutting down...");
        } catch (RuntimeException ex) {
            exceptionConsumer.accept(ex);
        } finally {
            consumer.close();
        }
    }

    public void stop() {
        consumer.wakeup();
    }
}

3.1. Creating a MockConsumer Instance

3.1.创建一个MockConsumer实例

Next, let’s see how we can create an instance of MockConsumer:

接下来,让我们看看如何创建一个MockConsumer的实例。

@BeforeEach
void setUp() {
    consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    updates = new ArrayList<>();
    countryPopulationConsumer = new CountryPopulationConsumer(consumer, 
      ex -> this.pollException = ex, updates::add);
}

Basically, all we need to provide is the offset reset strategy.

基本上,我们需要提供的是偏移重置策略。

Note that we use updates to collect the records countryPopulationConsumer will receive. This will help us to assert the expected results.

注意,我们使用更新来收集countryPopulationConsumer将收到的记录。这将有助于我们断定预期的结果。

In the same way, we use pollException to collect and assert the exceptions.

以同样的方式,我们使用 pollException 来收集和断言这些异常。

For all the test cases, we’ll use the above set up method. Now, let’s look at a few test cases for the consumer application.

对于所有的测试用例,我们将使用上述设置方法。现在,让我们看看消费者应用程序的几个测试案例。

3.2. Assigning Topic Partitions

3.2.指派主题分区

To begin, let’s create a test for the startByAssigning method:

首先,让我们为startByAssigning方法创建一个测试。

@Test
void whenStartingByAssigningTopicPartition_thenExpectUpdatesAreConsumedCorrectly() {
    // GIVEN
    consumer.schedulePollTask(() -> consumer.addRecord(record(TOPIC, PARTITION, "Romania", 19_410_000)));
    consumer.schedulePollTask(() -> countryPopulationConsumer.stop());

    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
    TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
    startOffsets.put(tp, 0L);
    consumer.updateBeginningOffsets(startOffsets);

    // WHEN
    countryPopulationConsumer.startByAssigning(TOPIC, PARTITION);

    // THEN
    assertThat(updates).hasSize(1);
    assertThat(consumer.closed()).isTrue();
}

At first, we set up the MockConsumer. We start by adding a record to the consumer using the addRecord method.

首先,我们设置了MockConsumer。我们首先使用addRecord method向消费者添加一条记录。

The first thing to remember is that we cannot add records before assigning or subscribing to a topic. That is why we schedule a poll task using the schedulePollTask method. The task we schedule will run on the first poll before the records are fetched. Thus, the addition of the record will happen after the assignment takes place.

首先要记住的是,我们不能在分配或订阅一个主题之前添加记录。这就是为什么我们使用schedulePollTaskmethod来安排一个投票任务。我们安排的任务将在记录被获取之前的第一次投票中运行。因此,记录的添加将在任务发生后发生。

Equally important is that we cannot add to the MockConsumer records that do not belong to the topic and partition assigned to it.

同样重要的是,我们不能向MockConsumer添加不属于分配给它的主题和分区的记录

Then, to make sure the consumer does not run indefinitely, we configure it to shut down at the second poll.

然后,为了确保消费者不会无限期地运行,我们将其配置为在第二次投票时关闭。

Additionally, we must set the beginning offsets. We do this using the updateBeginningOffsets method.

此外,我们必须设置起始偏移量。我们使用updateBeginningOffsets方法来做这个。

In the end, we check if we consumed the update correctly, and the consumer is closed.

最后,我们检查我们是否正确消费了更新,消费者被关闭。

3.3. Subscribing to Topics

3.3.订阅主题

Now, let’s create a test for our startBySubscribing method:

现在,让我们为我们的startBySubscribing方法创建一个测试。

@Test
void whenStartingBySubscribingToTopic_thenExpectUpdatesAreConsumedCorrectly() {
    // GIVEN
    consumer.schedulePollTask(() -> {
        consumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, 0)));
        consumer.addRecord(record("Romania", 1000, TOPIC, 0));
    });
    consumer.schedulePollTask(() -> countryPopulationConsumer.stop());

    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
    TopicPartition tp = new TopicPartition(TOPIC, 0);
    startOffsets.put(tp, 0L);
    consumer.updateBeginningOffsets(startOffsets);

    // WHEN
    countryPopulationConsumer.startBySubscribing(TOPIC);

    // THEN
    assertThat(updates).hasSize(1);
    assertThat(consumer.closed()).isTrue();
}

In this case, the first thing to do before adding a record is a rebalance. We do this by calling the rebalance method, which simulates a rebalance.

在这种情况下,添加记录前要做的第一件事是重新平衡。我们通过调用rebalance方法来完成,该方法模拟了一次再平衡。

The rest is the same as the startByAssigning test case.

其余部分与startByAssigning测试案例相同。

3.4. Controlling the Polling Loop

3.4.控制轮询环路

We can control the polling loop in multiple ways.

我们可以通过多种方式控制轮询循环。

The first option is to schedule a poll task as we did in the tests above. We do this via schedulePollTask, which takes a Runnable as a parameter. Each task we schedule will run when we call the poll method.

第一个选择是安排一个投票任务,就像我们在上面的测试中做的那样。我们通过schedulePollTask来实现,它需要一个Runnable作为参数。当我们调用poll方法时,我们安排的每个任务都将运行。

The second option we have is to call the wakeup method. Usually, this is how we interrupt a long poll call. Actually, this is how we implemented the stop method in CountryPopulationConsumer.

我们的第二个选择是调用wakeupmethod。通常情况下,我们就是这样中断一个长的轮询调用。实际上,这就是我们在CountryPopulationConsumer.中实现stop方法的方式。

Lastly, we can set an exception to be thrown using the setPollException method:

最后,我们可以使用setPollException 方法来设置要抛出的异常。

@Test
void whenStartingBySubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() {
    // GIVEN
    consumer.schedulePollTask(() -> consumer.setPollException(new KafkaException("poll exception")));
    consumer.schedulePollTask(() -> countryPopulationConsumer.stop());

    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
    TopicPartition tp = new TopicPartition(TOPIC, 0);
    startOffsets.put(tp, 0L);
    consumer.updateBeginningOffsets(startOffsets);

    // WHEN
    countryPopulationConsumer.startBySubscribing(TOPIC);

    // THEN
    assertThat(pollException).isInstanceOf(KafkaException.class).hasMessage("poll exception");
    assertThat(consumer.closed()).isTrue();
}

3.5. Mocking End Offsets and Partitions Info

3.5.嘲弄末端偏移和分区信息

If our consuming logic is based on end offsets or partition information, we can also mock these using MockConsumer.

如果我们的消费逻辑是基于末端偏移或分区信息,我们也可以使用MockConsumer模拟这些。

When we want to mock the end offset, we can use the addEndOffsets and updateEndOffsets methods.

当我们想模拟末端偏移时,我们可以使用addEndOffsetsupdateEndOffsets方法。

And, in case we want to mock partition information, we can use the updatePartitions method.

而且,如果我们想模拟分区信息,我们可以使用updatePartitions方法。

4. Conclusion

4.总结

In this article, we’ve explored how to use MockConsumer to test a Kafka consumer application.

在这篇文章中,我们探讨了如何使用MockConsumer来测试一个Kafka消费者应用程序。

First, we’ve looked at an example of consumer logic and which are the essential parts to test. Then, we tested a simple Kafka consumer application using the MockConsumer.

首先,我们看了一个消费者逻辑的例子,哪些是需要测试的基本部分。然后,我们使用MockConsumer测试了一个简单的Kafka消费者应用程序。

Along the way, we looked at the features of the MockConsumer and how to use it.

一路上,我们研究了MockConsumer的功能以及如何使用它。

As always, all these code samples are available over on GitHub.

一如既往,所有这些代码样本都可以在GitHub上找到。