Exactly Once Processing in Kafka with Java – 用Java在Kafka中进行精确的一次处理

最后修改: 2018年 9月 29日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll look at how Kafka ensures exactly-once delivery between producer and consumer applications through the newly introduced Transactional API.

在本教程中,我们将探讨Kafka如何通过新引入的事务性API确保生产者和消费者应用程序之间的精确交付。

Additionally, we’ll use this API to implement transactional producers and consumers to achieve end-to-end exactly-once delivery in a WordCount example.

此外,我们将使用这个API来实现事务性的生产者和消费者,以便在WordCount的例子中实现端到端的完全一次性交付。

2. Message Delivery in Kafka

2.Kafka中的消息传递

Due to various failures, messaging systems can’t guarantee message delivery between producer and consumer applications. Depending on how the client applications interact with such systems, the following message semantics are possible:

由于各种故障,消息传递系统不能保证生产者和消费者应用程序之间的消息传递。根据客户端应用程序与这种系统的交互方式,可能有以下消息语义。

  • If a messaging system will never duplicate a message but might miss the occasional message, we call that at-most-once
  • Or, if it will never miss a message but might duplicate the occasional message, we call it at-least-once
  • But, if it always delivers all messages without duplication, that is exactly-once

Initially, Kafka only supported at-most-once and at-least-once message delivery.

最初,Kafka只支持最多一次和最少一次的消息传递。

However, the introduction of Transactions between Kafka brokers and client applications ensures exactly-once delivery in Kafka. To understand it better, let’s quickly review the transactional client API.

然而,Kafka经纪商和客户端应用程序之间的交易的引入确保了Kafka中精确的一次交付。为了更好地理解它,让我们快速回顾一下交易型客户端API。

3. Maven Dependencies

3.Maven的依赖性

To work with the transaction API, we’ll need Kafka’s Java client in our pom:

为了使用交易API,我们需要在我们的pom中使用Kafka的Java客户端

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

4. A Transactional consume-transform-produce Loop

4.一个事务性的消费-转化-生产循环

For our example, we’re going to consume messages from an input topic, sentences.

在我们的例子中,我们将从一个输入主题sentences中消耗消息。

Then for each sentence, we’ll count every word and send the individual word counts to an output topic, counts.

然后,对于每个句子,我们将计算每一个字,并将单个字数发送到一个输出主题,counts

In the example, we’ll assume that there is already transactional data available in the sentences topic.

在这个例子中,我们将假设在sentences topic中已经有了交易数据。

4.1. A Transaction-Aware Producer

4.1.事务感知型生产者

So let’s first add a typical Kafka producer.

所以我们先来添加一个典型的Kafka生产者。

Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");

Additionally, though, we need to specify a transactional.id and enable idempotence:

此外,虽然我们需要指定一个transactional.id并启用idempotence

producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "prod-1");

KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

Because we’ve enabled idempotence, Kafka will use this transaction id as part of its algorithm to deduplicate any message this producer sends, ensuring idempotency.

因为我们已经启用了idempotence,Kafka将使用这个事务ID作为其算法的一部分,以重复这个生产者发送的任何消息,确保idempotency。

Simply put, if the producer accidentally sends the same message to Kafka more than once, these settings enable it to notice.

简单地说,如果生产者不小心将同一个消息发送给Kafka超过一次,这些设置可以让它注意到。

All that we need to do is make sure the transaction id is distinct for each producer, though consistent across restarts.

我们需要做的是确保每个生产者的交易ID是不同的,尽管在重新启动时是一致的。

4.2. Enabling the Producer for Transactions

4.2.启用事务的生产者

Once we are ready, then we also need to call initTransaction to prepare the producer to use transactions:

一旦我们准备好了,那么我们还需要调用initTransaction来准备生产者使用交易。

producer.initTransactions();

This registers the producer with the broker as one that can use transactions, identifying it by its transactional.id and a sequence number, or epoch. In turn, the broker will use these to write-ahead any actions to a transaction log.

这就把生产者在经纪人那里注册为可以使用交易的生产者,通过其transactional.id和一个序列号,即epoch来识别它。反过来,经纪人将使用这些数据将任何行动写到交易日志的前面。

And consequently, the broker will remove any actions from that log that belong to a producer with the same transaction id and earlier epoch, presuming them to be from defunct transactions.

因此,经纪商将从该日志中删除属于具有相同交易ID和更早的生产者的任何行动epoch,假定它们是来自已失效的交易。

4.3. A Transaction-Aware Consumer

4.3.具有事务意识的消费者

When we consume, we can read all the messages on a topic partition in order. Though, we can indicate with isolation.level that we should wait to read transactional messages until the associated transaction has been committed:

当我们消费时,我们可以按顺序读取一个主题分区上的所有消息。不过,我们可以用isolation.level表示我们应该等到相关的事务被提交之后再读取事务性消息

Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-group-id");
consumerProps.put("enable.auto.commit", "false");
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(singleton(“sentences”));

Using a value of read_committed ensures that we don’t read any transactional messages before the transaction completes.

使用read_committed的值可以确保我们在事务完成之前不会读取任何事务性消息。

The default value of isolation.level is read_uncommitted.

isolation.level的默认值是read_uncommitted.

4.4. Consuming and Transforming by Transaction

4.4.按事务消耗和转化

Now that we have the producer and consumer both configured to write and read transactionally, we can consume records from our input topic and count each word in each record:

现在,我们已经将生产者和消费者都配置为事务性地写入和读取,我们可以从我们的输入主题中消耗记录,并计算每条记录中的每个字。

ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
Map<String, Integer> wordCountMap =
  records.records(new TopicPartition("input", 0))
    .stream()
    .flatMap(record -> Stream.of(record.value().split(" ")))
    .map(word -> Tuple.of(word, 1))
    .collect(Collectors.toMap(tuple -> 
      tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));

Note, that there is nothing transactional about the above code. But, since we used read_committed, it means that no messages that were written to the input topic in the same transaction will be read by this consumer until they are all written.

请注意,上面的代码没有任何事务性。但是,由于我们使用了read_committed,这意味着在同一事务中被写入输入主题的消息将不会被这个消费者读取,直到它们被全部写入。

Now, we can send the calculated word count to the output topic.

现在,我们可以把计算出来的字数发送到输出主题。

Let’s see how we can produce our results, also transactionally.

让我们看看我们如何能够产生我们的结果,也是交易性的。

4.5. Send API

4.5.发送API

To send our counts as new messages, but in the same transaction, we call beginTransaction:

为了将我们的计数作为新的消息发送,但在同一个事务中,我们调用beginTransaction

producer.beginTransaction();

Then, we can write each one to our “counts” topic with the key being the word and the count being the value:

然后,我们可以把每一个写到我们的 “计数 “主题中,关键是单词,计数是值。

wordCountMap.forEach((key,value) -> 
    producer.send(new ProducerRecord<String,String>("counts",key,value.toString())));

Note that because the producer can partition the data by the key, this means that transactional messages can span multiple partitions, each being read by separate consumers. Therefore, Kafka broker will store a list of all updated partitions for a transaction.

请注意,由于生产者可以通过密钥对数据进行分区,这意味着交易消息可以跨越多个分区,每个分区都被不同的消费者读取。因此,Kafka代理将为一个交易存储所有更新的分区的列表。

Note also that, within a transaction, a producer can use multiple threads to send records in parallel.

还要注意的是,在一个事务中,生产者可以使用多个线程来并行发送记录

4.6. Committing Offsets

4.6.提交偏移量

And finally, we need to commit our offsets that we just finished consuming. With transactions, we commit the offsets back to the input topic we read them from, like normal. Also though, we send them to the producer’s transaction.

最后,我们需要提交我们刚刚完成消费的偏移量。在事务中,我们将偏移量提交回我们所读取的输入主题,就像平常一样。同时,我们将它们发送给生产者的事务。

We can do all of this in a single call, but we first need to calculate the offsets for each topic partition:

我们可以在一次调用中完成所有这些,但我们首先需要计算每个主题分区的偏移量。

Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
    long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
    offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
}

Note that what we commit to the transaction is the upcoming offset, meaning we need to add 1.

注意,我们提交给交易的是即将到来的偏移量,意味着我们需要增加1.

Then we can send our calculated offsets to the transaction:

然后我们就可以把我们计算好的偏移量发送给交易。

producer.sendOffsetsToTransaction(offsetsToCommit, "my-group-id");

4.7. Committing or Aborting the Transaction

4.7.提交或中止事务

And, finally, we can commit the transaction, which will atomically write the offsets to the consumer_offsets topic as well as to the transaction itself:

最后,我们可以提交事务,这将原子地将偏移量写入consumer_offsetstopic以及事务本身。

producer.commitTransaction();

This flushes any buffered message to the respective partitions. In addition, the Kafka broker makes all messages in that transaction available to the consumers.

这就把任何缓冲的消息冲到了各自的分区中。此外,Kafka代理将该事务中的所有消息提供给消费者。

Of course, if anything goes wrong while we are processing, for example, if we catch an exception, we can call abortTransaction:

当然,如果我们在处理过程中出现任何问题,例如,如果我们捕捉到一个异常,我们可以调用abortTransaction:

try {
  // ... read from input topic
  // ... transform
  // ... write to output topic
  producer.commitTransaction();
} catch ( Exception e ) {
  producer.abortTransaction();
}

And drop any buffered messages and remove the transaction from the broker.

并丢弃任何缓冲的信息,并从代理处删除交易。

If we neither commit nor abort before the broker-configured max.transaction.timeout.ms, the Kafka broker will abort the transaction itself. The default value for this property is 900,000 milliseconds or 15 minutes.

如果我们在经纪人配置的max.transaction.timeout.ms之前既没有提交也没有中止,Kafka经纪人将中止交易本身。该属性的默认值是900,000毫秒或15分钟。

5. Other consume-transform-produce Loops

5.其他消耗-转化-生产循环

What we’ve just seen is a basic consume-transform-produce loop which reads and writes to the same Kafka cluster.

我们刚才看到的是一个基本的消费-转化-生产循环,它向同一个Kafka集群读取和写入。

Conversely, applications that must read and write to different Kafka clusters must use the older commitSync and commitAsync API. Typically, applications will store consumer offsets into their external state storage to maintain transactionality.

相反,必须向不同的Kafka集群进行读写的应用程序必须使用旧的commitSynccommitAsyncAPI。通常情况下,应用程序将把消费者的偏移量存储到他们的外部状态存储中,以保持交易性。

6. Conclusion

6.结论

For data-critical applications, end-to-end exactly-once processing is often imperative.

对于数据关键型应用来说,端到端的完全一次性处理往往是必须的。

In this tutorial, we saw how we use Kafka to do exactly this, using transactions, and we implemented a transaction-based word counting example to illustrate the principle.

在本教程中,我们看到了我们如何使用Kafka来做到这一点,使用事务,并且我们实现了一个基于事务的单词计数的例子来说明原理。

Feel free to check out all the code samples on GitHub.

请随时查看GitHub上的所有代码样本