Guide to Spring Cloud Stream with Kafka, Apache Avro and Confluent Schema Registry – 使用Kafka、Apache Avro和Confluent Schema Registry的Spring Cloud Stream指南

最后修改: 2019年 7月 13日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

Apache Kafka is a messaging platform. With it, we can exchange data between different applications at scale.

Apache Kafka是一个消息传递平台。通过它,我们可以在不同的应用程序之间大规模地交换数据。

Spring Cloud Stream is a framework for building message-driven applications. It can simplify the integration of Kafka into our services.

Spring Cloud Stream是一个用于构建消息驱动型应用程序的框架。它可以简化Kafka在我们服务中的集成。

Conventionally, Kafka is used with the Avro message format, supported by a schema registry. In this tutorial, we’ll use the Confluent Schema Registry. We’ll try both Spring’s implementation of integration with the Confluent Schema Registry and also the Confluent native libraries.

传统上,Kafka使用的是Avro消息格式,由模式注册中心支持。在本教程中,我们将使用Confluent Schema Registry。我们将尝试Spring与Confluent模式注册中心的集成实现,也将尝试Confluent的本地库。

2. Confluent Schema Registry

2.Confluent模式注册表

Kafka represents all data as bytes, so it’s common to use an external schema and serialize and deserialize into bytes according to that schema. Rather than supply a copy of that schema with each message, which would be an expensive overhead, it’s also common to keep the schema in a registry and supply just an id with each message.

Kafka将所有数据都表示为字节,所以通常会使用一个外部模式,并根据该模式进行序列化和反序列化为字节。与其在每条消息中提供一份模式的副本(这将是一个昂贵的开销),不如将模式保存在一个注册表中,并在每条消息中只提供一个ID,这也很常见。

Confluent Schema Registry provides an easy way to store, retrieve and manage schemas. It exposes several useful RESTful APIs.

Confluent 模式注册中心提供了一种存储、检索和管理模式的简便方法。它暴露了几个有用的RESTful APIs

Schemata are stored by subject, and by default, the registry does a compatibility check before allowing a new schema to be uploaded against a subject.

模式是按主题存储的,默认情况下,在允许针对主题上传新模式之前,注册表会进行兼容性检查。

Each producer will know the schema it’s producing with, and each consumer should be able to either consume data in ANY format or should have a specific schema it prefers to read in. The producer consults the registry to establish the correct ID to use when sending a message. The consumer uses the registry to fetch the sender’s schema. 

每个生产者将知道它的生产模式,而每个消费者应该能够消费任何格式的数据,或者应该有一个它更喜欢的特定模式来读取。生产者咨询注册表,以确定发送消息时要使用的正确ID。消费者使用注册表来获取发送者的模式。

When the consumer knows both the sender’s schema and its own desired message format, the Avro library can convert the data into the consumer’s desired format.

当消费者知道发送方的模式和自己想要的消息格式时,Avro库可以将数据转换成消费者想要的格式。

3. Apache Avro

3.阿帕奇-阿夫罗

Apache Avro is a data serialization system.

Apache Avro是一个数据序列化系统

It uses a JSON structure to define the schema, providing for serialization between bytes and structured data.

它使用JSON结构来定义模式,提供字节和结构化数据之间的序列化。

One strength of Avro is its support for evolving messages written in one version of a schema into the format defined by a compatible alternative schema.

Avro的一个优势是它支持将以一种模式的一个版本编写的信息演化为兼容的替代模式所定义的格式。

The Avro toolset is also able to generate classes to represent the data structures of these schemata, making it easy to serialize in and out of POJOs.

Avro工具集也能够生成类来表示这些模式的数据结构,使其很容易在POJO中进行序列化和输出。

4. Setting up the Project

4.设置项目

To use a schema registry with Spring Cloud Stream, we need the Spring Cloud Kafka Binder and schema registry Maven dependencies:

要在Spring Cloud Stream中使用模式注册表,我们需要Spring Cloud Kafka Binder模式注册表Maven依赖项。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-schema</artifactId>
</dependency>

For Confluent’s serializer, we need:

对于Confluent的串行器,我们需要。

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>4.0.0</version>
</dependency>

And the Confluent’s Serializer is in their repo:

而Confluent的Serializer就在他们的repo中。

<repositories>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>

Also, let’s use a Maven plugin to generate the Avro classes:

另外,让我们使用Maven插件来生成Avro类。

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.8.2</version>
            <executions>
                <execution>
                    <id>schemas</id>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                        <goal>protocol</goal>
                        <goal>idl-protocol</goal>
                    </goals>
                    <configuration>                        
                        <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
                        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

For testing, we can use either an existing Kafka and Schema Registry set up or use a dockerized Confluent and Kafka.

对于测试,我们可以使用现有的Kafka和Schema Registry设置,或者使用dockerized Confluent和Kafka.

5. Spring Cloud Stream

5.Spring的云流

Now that we’ve got our project set up, let’s next write a producer using Spring Cloud Stream. It’ll publish employee details on a topic.

现在我们已经建立了我们的项目,接下来让我们使用Spring Cloud Stream编写一个生产者。它将在一个主题上发布员工的详细信息。

Then, we’ll create a consumer that will read events from the topic and write them out in a log statement.

然后,我们将创建一个消费者,从主题中读取事件,并在日志语句中写出来。

5.1. Schema

5.1. 计划

First, let’s define a schema for employee details. We can name it employee-schema.avsc.

首先,让我们为雇员的详细资料定义一个模式。我们可以把它命名为employee-schema.avsc

We can keep the schema file in src/main/resources:

我们可以把模式文件放在src/main/resources:中。

{
    "type": "record",
    "name": "Employee",
    "namespace": "com.baeldung.schema",
    "fields": [
    {
        "name": "id",
        "type": "int"
    },
    {
        "name": "firstName",
        "type": "string"
    },
    {
        "name": "lastName",
        "type": "string"
    }]
}

After creating the above schema, we need to build the project. Then, the Apache Avro code generator will create a POJO named Employee under the package com.baeldung.schema.

在创建上述模式后,我们需要构建项目。然后,Apache Avro代码生成器将在com.baeldung.schema包下创建一个名为Employee的POJO。

5.2. Producer

5.2.生产者

Spring Cloud Stream provides the Processor interface. This provides us with an output and input channel.

Spring Cloud Stream提供了Processor接口。这为我们提供了一个输出和输入通道。

Let’s use this to make a producer that sends Employee objects to the employee-details Kafka topic:

让我们用它来制作一个生产者,将Employee对象发送到employee-details Kafka主题

@Autowired
private Processor processor;

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

    // creating employee details
    Employee employee = new Employee();
    employee.setId(empId);
    employee.setFirstName(firstName);
    employee.setLastName(lastName);

    Message<Employee> message = MessageBuilder.withPayload(employee)
                .build();

    processor.output()
        .send(message);
}

5.2. Consumer

5.2.消费者

Now, let’s write our consumer:

现在,让我们来写我们的消费者。

@StreamListener(Processor.INPUT)
public void consumeEmployeeDetails(Employee employeeDetails) {
    logger.info("Let's process employee details: {}", employeeDetails);
}

This consumer will read events published on the employee-details topic. Let’s direct its output to the log to see what it does.

这个消费者将读取发布在employee-details主题上的事件。让我们把它的输出指向日志,看看它做了什么。

5.3. Kafka Bindings

5.3. Kafka 绑定

So far we’ve only been working against the input and output channels of our Processor object. These channels need configuring with the correct destinations.

到目前为止,我们只针对我们的Processor对象的inputoutput通道工作。这些通道需要配置正确的目的地。

Let’s use application.yml to provide the Kafka bindings:

让我们使用application.yml来提供Kafka的绑定。

spring:
  cloud:
    stream: 
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro

We should note that, in this case, destination means the Kafka topic. It may be slightly confusing that it is called destination since it is the input source in this case, but it’s a consistent term across consumers and producers.

我们应该注意,在这种情况下,destination指的是Kafka主题。它被称为destination可能会有点混乱,因为它是本例中的输入源,但这是一个跨消费者和生产者的一致术语。

5.4. Entry Point

5.4.进入点

Now that we have our producer and consumer, let’s expose an API to take inputs from a user and pass it to the producer:

现在我们有了生产者和消费者,让我们公开一个API来接受用户的输入并将其传递给生产者。

@Autowired
private AvroProducer avroProducer;

@PostMapping("/employees/{id}/{firstName}/{lastName}")
public String producerAvroMessage(@PathVariable int id, @PathVariable String firstName, 
  @PathVariable String lastName) {
    avroProducer.produceEmployeeDetails(id, firstName, lastName);
    return "Sent employee details to consumer";
}

5.5. Enable the Confluent Schema Registry and Bindings

5.5.启用Confluent模式注册表和绑定功能

Finally, to make our application apply both the Kafka and schema registry bindings, we’ll need to add @EnableBinding and @EnableSchemaRegistryClient on one of our configuration classes:

最后,为了使我们的应用程序同时应用Kafka和模式注册表的绑定,我们需要在我们的一个配置类中添加@EnableBinding@EnableSchemaRegistryClient

@SpringBootApplication
@EnableBinding(Processor.class)
// The @EnableSchemaRegistryClient annotation needs to be uncommented to use the Spring native method.
// @EnableSchemaRegistryClient
public class AvroKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(AvroKafkaApplication.class, args);
    }

}

And we should provide a ConfluentSchemaRegistryClient bean:

而我们应该提供一个ConfluentSchemaRegistryClientbean。

@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}")
private String endPoint;

@Bean
public SchemaRegistryClient schemaRegistryClient() {
    ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
    client.setEndpoint(endPoint);
    return client;
}

The endPoint is the URL for the Confluent Schema Registry.

endPoint是Confluent Schema Registry的URL。

5.6. Testing Our Service

5.6.测试我们的服务

Let’s test the service with a POST request:

让我们用一个POST请求来测试这个服务。

curl -X POST localhost:8080/employees/1001/Harry/Potter

The logs tell us that this has worked:

日志告诉我们,这已经成功了。

2019-06-11 18:45:45.343  INFO 17036 --- [container-0-C-1] com.baeldung.consumer.AvroConsumer       : Let's process employee details: {"id": 1001, "firstName": "Harry", "lastName": "Potter"}

5.7. What Happened During Processing?

5.7.在处理过程中发生了什么?

Let’s try to understand what exactly happened with our example application:

让我们试着理解我们的示例应用程序到底发生了什么。

  1. The producer built the Kafka message using the Employee object
  2. The producer registered the employee schema with the schema registry to get a schema version ID, this either creates a new ID or reuses the existing one for that exact schema
  3. Avro serialized the Employee object using the schema
  4. Spring Cloud put the schema-id in the message headers
  5. The message was published on the topic
  6. When the message came to the consumer, it read the schema-id from the header
  7. The consumer used schema-id to get the Employee schema from the registry
  8. The consumer found a local class that could represent that object and deserialized the message into it

6. Serialization/Deserialization Using Native Kafka Libraries

6.使用本地Kafka库进行序列化/反序列化

Spring Boot provides a few out of box message converters. By default, Spring Boot uses the Content-Type header to select an appropriate message converter.

Spring Boot提供了一些开箱即用的消息转换器。默认情况下,Spring Boot使用Content-Type头来选择一个合适的消息转换器。

In our example, the Content-Type is application/*+avro, Hence it used AvroSchemaMessageConverter to read and write Avro formats. But, Confluent recommends using KafkaAvroSerializer and KafkaAvroDeserializer for message conversion.

在我们的例子中,Content-Typeapplication/*+avro,因此它使用了AvroSchemaMessageConverter来读取和写入Avro格式。但是,Confluent建议使用KafkaAvroSerializerKafkaAvroDeserializer进行消息转换

While Spring’s own format works well, it has some drawbacks in terms of partitioning, and it is not interoperable with the Confluent standards, which some non-Spring services on our Kafka instance may need to be.

虽然Spring自己的格式运行良好,但在分区方面有一些缺点,而且它不能与Confluent标准互通,而我们Kafka实例上的一些非Spring服务可能需要这样。

Let’s update our application.yml to use the Confluent converters:

让我们更新我们的application.yml以使用Confluent转换器。

spring:
  cloud:
    stream:
      default: 
        producer: 
          useNativeEncoding: true
        consumer:  
          useNativeEncoding: true     
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro
      kafka:
         binder:        
           producer-properties:
             key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             schema.registry.url: http://localhost:8081 
           consumer-properties:
             key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             schema.registry.url: http://localhost:8081
             specific.avro.reader: true

We have enabled the useNativeEncoding. It forces Spring Cloud Stream to delegate serialization to the provided classes.

我们已经启用了useNativeEncoding。它迫使Spring Cloud Stream将序列化委托给所提供的类。

We should also know how we can provide native settings properties for Kafka within Spring Cloud using kafka.binder.producer-properties and kafka.binder.consumer-properties.

我们还应该知道如何使用kafka.binder.producer-propertieskafka.binder.consumer-properties.在Spring Cloud中为Kafka提供本地设置属性。

7. Consumer Groups and Partitions

7.消费者团体和分区

The consumer groups are the set of consumers belonging to the same application. Consumers from the same Consumer Group share the same group name.

消费者组是属于同一个应用程序的消费者的集合。来自同一消费者组的消费者共享相同的组名。

Let’s update application.yml to add a consumer group name:

让我们更新application.yml,添加一个消费者组名。

spring:
  cloud:
    stream:
      // ...     
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
      // ...

All the consumers distribute the topic partitions among them evenly. Messages in different partitions will be processed in parallel.

所有的消费者在他们之间均匀地分配主题分区。不同分区的信息将被并行处理。

In a consumer group, the max number of consumers reading messages at a time is equal to the number of partitions. So we can configure the number of partitions and consumers to get the desired parallelism. In general, we should have more partitions than the total number of consumers across all replicas of our service.

在一个消费者组中,每次读取消息的最大消费者数量等于分区的数量。因此我们可以配置分区和消费者的数量,以获得所需的并行性。一般来说,我们的分区应该多于我们服务的所有副本中消费者的总数。

7.1. Partition Key

7.1.分区钥匙

When processing our messages, the order they are processed may be important. When our messages are processed in parallel, the sequence of processing would be hard to control.

当处理我们的消息时,它们的处理顺序可能很重要。当我们的消息被平行处理时,处理的顺序将很难控制。

Kafka provides the rule that in a given partition, the messages are always processed in the sequence they arrived. So, where it matters that certain messages are processed in the right order, we ensure that they land in the same partition as each other.

Kafka提供了这样的规则:在一个给定的分区中,消息总是按照它们到达的顺序进行处理。因此,如果某些消息的处理顺序很重要,我们就会确保它们在同一个分区里。

We can provide a partition key while sending a message to a topic. The messages with the same partition key will always go to the same partition. If the partition key is not present, messages will be partitioned in round-robin fashion.

我们可以在向主题发送消息时提供一个分区密钥。具有相同分区密钥的消息将总是被送到同一个分区。如果分区密钥不存在,消息将以轮流方式进行分区。

Let’s try to understand this with an example. Imagine we are receiving multiple messages for an employee and we want to process all the messages of an employee in the sequence. The department name and employee id can identify an employee uniquely.

让我们试着用一个例子来理解这个问题。想象一下,我们正在接收一个雇员的多条信息,我们想按顺序处理一个雇员的所有信息。部门名称和雇员ID可以唯一地识别一个雇员。

So let’s define the partition key with employee’s id and department name:

因此,让我们用雇员的ID和部门名称来定义分区键。

{
    "type": "record",
    "name": "EmployeeKey",
    "namespace": "com.baeldung.schema",
    "fields": [
     {
        "name": "id",
        "type": "int"
    },
    {
        "name": "departmentName",
        "type": "string"
    }]
}

After building the project, the EmployeeKey POJO will get generated under the package com.baeldung.schema.

在构建项目后,EmployeeKey POJO将在com.baeldung.schema包下生成。

Let’s update our producer to use the EmployeeKey as a partition key:

让我们更新我们的生产者,使用EmployeeKey作为分区键。

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

    // creating employee details
    Employee employee = new Employee();
    employee.setId(empId);
    // ...

    // creating partition key for kafka topic
    EmployeeKey employeeKey = new EmployeeKey();
    employeeKey.setId(empId);
    employeeKey.setDepartmentName("IT");

    Message<Employee> message = MessageBuilder.withPayload(employee)
        .setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey)
        .build();

    processor.output()
        .send(message);
}

Here, we’re putting the partition key in the message header.

在这里,我们将分区密钥放在消息头中。

Now, the same partition will receive the messages with the same employee id and department name.

现在,同一个分区将收到相同的雇员ID和部门名称的信息。

7.2. Consumer Concurrency

7.2.消费者并发性

Spring Cloud Stream allows us to set the concurrency for a consumer in application.yml:

Spring Cloud Stream允许我们在application.yml中为一个消费者设置并发性。

spring:
  cloud:
    stream:
      // ... 
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
          concurrency: 3

Now our consumers will read three messages from the topic concurrently. In other words, Spring will spawn three different threads to consume independently.

现在我们的消费者将从主题中同时读取三个消息。换句话说,Spring将催生三个不同的线程来独立消费。

8. Conclusion

8.结语

In this article, we integrated a producer and consumer against Apache Kafka with Avro schemas and the Confluent Schema Registry.

在这篇文章中,我们针对Apache Kafka与Avro schemas和Confluent Schema Registry整合了一个生产者和消费者。

We did this in a single application, but the producer and consumer could have been deployed in different applications and would have been able to have their own versions of the schemas, kept in sync via the registry.

我们在一个单一的应用程序中做到了这一点,但是生产者和消费者可以部署在不同的应用程序中,并且能够拥有他们自己的模式版本,通过注册表保持同步。

We looked at how to use Spring’s implementation of Avro and Schema Registry client, and then we saw how to switch over to the Confluent standard implementation of serialization and deserialization for the purposes of interoperability.

我们研究了如何使用Spring的Avro和Schema Registry客户端的实现,然后我们看到了如何切换到Confluent标准实现的序列化和反序列化,以达到互操作的目的。

Finally, we looked at how to partition our topic and ensure we have the correct message keys to enable safe parallel processing of our messages.

最后,我们研究了如何对我们的主题进行分区,并确保我们有正确的消息密钥,以实现对消息的安全并行处理。

The complete code used for this article can be found over GitHub.

本文使用的完整代码可以在GitHub上找到。