1. Overview
1.概述
Kafka is a message processing system built around a distributed messaging queue. It provides a Java library so that applications can write data to, or read data from, a Kafka topic.
Kafka是一个围绕分布式消息队列建立的消息处理系统。它提供了一个Java库,以便应用程序可以向Kafka主题写入数据或从其读取数据。
Now, since most of the business domain logic is validated through unit tests, applications generally mock all I/O operations in JUnit. Kafka also provides a MockProducer to mock a producer application.
现在,由于大部分业务领域的逻辑都是通过单元测试来验证的,所以应用程序一般会在JUnit中模拟所有的I/O操作。Kafka还提供了一个MockProducer来模拟一个生产者应用程序。
In this tutorial, we’ll first implement a Kafka producer application. Later, we’ll implement a unit test to verify common producer operations with MockProducer.
在本教程中,我们将首先实现一个Kafka生产者应用。之后,我们将实现一个单元测试,用MockProducer验证常见的生产者操作。
2. Maven Dependencies
2.Maven的依赖性
Before we implement a producer application, we’ll add a Maven dependency for kafka-clients:
在实现生产者应用之前,我们要为kafka-clients添加一个Maven依赖项。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
3. MockProducer
3.MockProducer
The kafka-clients library contains a Java library for publishing and consuming messages in Kafka. Producer applications can use these API’s to send key-value records to a Kafka topic:
kafka-clientslibrary包含一个Java库,用于在Kafka中发布和消费消息。生产者应用程序可以使用这些API来发送键值记录到Kafka主题。
public class KafkaProducer {
private final Producer<String, String> producer;
public KafkaProducer(Producer<String, String> producer) {
this.producer = producer;
}
public Future<RecordMetadata> send(String key, String value) {
ProducerRecord record = new ProducerRecord("topic_sports_news", key, value);
return producer.send(record);
}
}
Any Kafka producer must implement the Producer interface in the client’s library. Kafka also provides a KafkaProducer class, which is a concrete implementation that performs the I/O operations towards a Kafka broker.
任何Kafka生产者都必须在客户端的库中实现Producer接口。Kafka还提供了一个KafkaProducer类,它是一个具体的实现,对Kafka代理进行I/O操作。
Furthermore, Kafka provides a MockProducer that implements the same Producer interface and mocks all I/O operations implemented in the KafkaProducer:
此外,Kafka还提供了一个MockProducer,它实现了相同的Producer接口并模拟了KafkaProducer中实现的所有I/O操作。
@Test
void givenKeyValue_whenSend_thenVerifyHistory() {
MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("soccer",
"{\"site\" : \"baeldung\"}");
assertTrue(mockProducer.history().size() == 1);
}
Although such I/O operations can also be mocked with Mockito, MockProducer gives us access to a lot of features that we would need to implement on top of our mock. One such feature is the history() method. MockProducer caches the records for which send() is called, thereby allowing us to validate the publish behavior of the producer.
尽管此类 I/O 操作也可以通过 Mockito 进行模拟,MockProducer 使我们能够访问很多我们需要在模拟之上实现的功能。 其中一个功能是 history() 方法。MockProducer缓存了调用send()的记录,从而使我们能够验证生产者的发布行为。
Moreover, we can also validate the metadata like topic name, partition, record key, or value:
此外,我们还可以验证元数据,如主题名称、分区、记录键或值。
assertTrue(mockProducer.history().get(0).key().equalsIgnoreCase("data"));
assertTrue(recordMetadataFuture.get().partition() == 0);
4. Mocking a Kafka Cluster
4.嘲弄一个Kafka集群
In our mocked tests so far, we’ve assumed a topic with just one partition. However, for achieving maximum concurrency between producer and consumer threads, Kafka topics are usually split into multiple partitions.
在我们迄今为止的模拟测试中,我们假设一个主题只有一个分区。然而,为了实现生产者和消费者线程之间的最大并发性,Kafka主题通常被分割成多个分区。
This allows producers to write data into multiple partitions. This is usually achieved by partitioning the records based on key and mapping specific keys to a particular partition:
这允许生产者将数据写入多个分区中。这通常是通过基于键的记录分区和将特定的键映射到一个特定的分区中来实现。
public class EvenOddPartitioner extends DefaultPartitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
if (((String)key).length() % 2 == 0) {
return 0;
}
return 1;
}
}
Because of this, all even-length keys will be published to partition “0” and, likewise, odd-length keys to partition “1”.
正因为如此,所有偶数长度的钥匙将被发布到分区 “0”,同样地,奇数长度的钥匙将被发布到分区 “1”。
MockProducer enables us to validate such partition assignment algorithms by mocking the Kafka cluster with multiple partitions:
MockProducer 使我们能够通过模拟多个分区的Kafka集群来验证这种分区分配算法:
@Test
void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber()
throws ExecutionException, InterruptedException {
PartitionInfo partitionInfo0 = new PartitionInfo(TOPIC_NAME, 0, null, null, null);
PartitionInfo partitionInfo1 = new PartitionInfo(TOPIC_NAME, 1, null, null, null);
List<PartitionInfo> list = new ArrayList<>();
list.add(partitionInfo0);
list.add(partitionInfo1);
Cluster cluster = new Cluster("kafkab", new ArrayList<Node>(), list, emptySet(), emptySet());
this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(),
new StringSerializer(), new StringSerializer());
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("partition",
"{\"site\" : \"baeldung\"}");
assertTrue(recordMetadataFuture.get().partition() == 1);
}
We mocked a Cluster with two partitions, 0 and 1. We can then verify that EvenOddPartitioner publishes the record to partition 1.
我们模拟了一个有0和1两个分区的集群。然后我们可以验证EvenOddPartitioner将记录发布到分区1。
5. Mocking Errors with MockProducer
5.用MockProducer嘲弄错误
So far, we’ve only mocked the producer to send a record to a Kafka topic successfully. But, what happens if there’s an exception when writing a record?
到目前为止,我们只模拟了生产者成功发送一条记录到Kafka主题。但是,如果在写记录的时候出现了异常,会怎么样呢?
Applications usually handle such exceptions by retrying or throwing the exception to the client.
应用程序通常通过重试或向客户抛出异常来处理这种异常。
MockProducer allows us to mock exceptions during send() so that we can validate the exception-handling code:
MockProducer允许我们在send()期间模拟异常,以便我们可以验证异常处理代码。
@Test
void givenKeyValue_whenSend_thenReturnException() {
MockProducer<String, String> mockProducer = new MockProducer<>(false,
new StringSerializer(), new StringSerializer())
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> record = kafkaProducer.send("site", "{\"site\" : \"baeldung\"}");
RuntimeException e = new RuntimeException();
mockProducer.errorNext(e);
try {
record.get();
} catch (ExecutionException | InterruptedException ex) {
assertEquals(e, ex.getCause());
}
assertTrue(record.isDone());
}
There are two notable things in this code.
这段代码中有两个值得注意的地方。
First, we called the MockProducer constructor with autoComplete as false. This tells the MockProducer to wait for input before completing the send() method.
首先,我们调用了MockProducer构造器,并将autoComplete设为false。MockProducer在完成send()方法之前要等待输入。
Second, we’ll call mockProducer.errorNext(e), so that MockProducer returns an exception for the last send() call.
其次,我们将调用mockProducer.errorNext(e),这样,MockProducer就会为最后一次send()的调用返回一个异常。
6. Mocking Transactional Writes with MockProducer
6.用MockProducer模拟事务性写操作
Kafka 0.11 introduced transactions between Kafka brokers, producers, and consumers. This allowed the end-to-end Exactly-Once message delivery semantic in Kafka. In short, this means that transactional producers can only publish records to a broker with a two-phase commit protocol.
Kafka 0.11引入了Kafka经纪人、生产者和消费者之间的交易。这允许在Kafka中实现端到端的Exactly-Once消息传递语义。简而言之,这意味着事务性生产者只能向具有两阶段提交协议的经纪人发布记录。
MockProducer also supports transactional writes and allows us to verify this behavior:
MockProducer也支持事务性写入,并允许我们验证这一行为。
@Test
void givenKeyValue_whenSendWithTxn_thenSendOnlyOnTxnCommit() {
MockProducer<String, String> mockProducer = new MockProducer<>(true,
new StringSerializer(), new StringSerializer())
kafkaProducer = new KafkaProducer(mockProducer);
kafkaProducer.initTransaction();
kafkaProducer.beginTransaction();
Future<RecordMetadata> record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");
assertTrue(mockProducer.history().isEmpty());
kafkaProducer.commitTransaction();
assertTrue(mockProducer.history().size() == 1);
}
Since MockProducer also supports the same APIs as the concrete KafkaProducer, it only updates the history once we commit the transaction. Such mocking behavior can help applications validate that commitTransaction() is invoked for every transaction.
由于MockProducer也支持与具体的KafkaProducer相同的API,它只在我们提交事务后更新history。这样的嘲弄行为可以帮助应用程序验证commitTransaction()是否为每个事务调用了。
7. Conclusion
7.结语
In this article, we looked at the MockProducer class of the kafka-client library. We discussed that MockProducer implements the same hierarchy as the concrete KafkaProducer and, therefore, we can mock all I/O operations with a Kafka broker.
在这篇文章中,我们看了MockProducer类的kafka-client库。我们讨论了MockProducer实现了与具体的KafkaProducer相同的层次结构,因此,我们可以模拟Kafka代理的所有I/O操作。
We also discussed some complex mocking scenarios and were able to test exceptions, partitioning, and transactions with the MockProducer.
我们还讨论了一些复杂的嘲弄场景,并能够使用MockProducer测试异常、分区和事务。。
As always, all code examples are available over on GitHub.
一如既往,所有的代码实例都可以在GitHub上找到。