Configuring Kafka SSL Using Spring Boot – 使用Spring Boot配置Kafka SSL

最后修改: 2021年 8月 29日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

In this tutorial, we’ll cover the basic setup for connecting a Spring Boot client to an Apache Kafka broker using SSL authentication.

在本教程中,我们将介绍使用 SSL 身份验证将 Spring Boot 客户端连接到 Apache Kafka 代理的基本设置。

Secure Sockets Layer (SSL) has actually been deprecated and replaced with Transport Layer Security (TLS) since 2015. However, for historic reasons, Kafka (and Java) still refer to “SSL” and we’ll be following this convention in this article as well.

安全套接字层(SSL)实际上从2015年开始已经被废弃,并被传输层安全(TLS)所取代。然而,由于历史原因,Kafka(和Java)仍然提到了 “SSL”,我们在本文中也将遵循这一惯例。

2. SSL Overview

2.SSL概述

By default, Apache Kafka sends all data as clear text and without any authentication.

默认情况下,Apache Kafka以明文形式发送所有数据,并且没有任何认证。

First of all, we can configure SSL for encryption between the broker and the client. This, by default, requires one-way authentication using public key encryption where the client authenticates the server certificate.

首先,我们可以配置SSL,用于经纪人和客户之间的加密。默认情况下,这需要使用公共密钥加密的单向认证,其中客户端认证服务器的证书

In addition, the server can also authenticate the client using a separate mechanism (such as SSL or SASL), thus enabling two-way authentication or mutual TLS (mTLS). Basically, two-way SSL authentication ensures that the client and the server both use SSL certificates to verify each other’s identities and trust each other in both directions.

此外,服务器也可以使用一个单独的机制(如SSL或SASL)来验证客户端,从而实现双向认证或相互TLS(mTLS)。基本上,双向SSL认证确保客户端和服务器都使用SSL证书来验证对方的身份,并在两个方向上相互信任

In this article, the broker will be using SSL to authenticate the client, and keystore and truststore will be used for holding the certificates and keys.

在本文中,经纪商将使用SSL来验证客户,而keystoretruststore将被用来保存证书和密钥。

Each broker requires its own keystore which contains the private key and the public certificate. The client uses its truststore to authenticate this certificate and trust the server. Similarly, each client also requires its own keystore which contains its private key and the public certificate. The server uses its truststore to authenticate and trust the client’s certificate and establish a secure connection.

每个经纪人都需要自己的密钥库,其中包含私钥和公共证书。客户端使用其信任库来验证该证书并信任服务器。同样地,每个客户也需要自己的密钥库,其中包含其私钥和公共证书。服务器使用它的信任库来验证和信任客户的证书,并建立一个安全连接。

The truststore can contain a Certificate Authority (CA) which can sign certificates. In this case, the broker or the client trusts any certificate signed by the CA that is present in the truststore. This simplifies the certificate authentication as adding new clients or brokers does not require a change to the truststore.

信任库可以包含一个证书颁发机构(CA),它可以签署证书。在这种情况下,经纪商或客户信任由存在于信任库中的CA签署的任何证书。这简化了证书认证,因为添加新的客户或经纪商不需要改变信任库。

3. Dependencies and Setup

3.依赖性和设置

Our example application will be a simple Spring Boot application.

我们的示例应用程序将是一个简单的Spring Boot应用程序。

In order to connect to Kafka, let’s add the spring-kafka dependency in our POM file:

为了连接到Kafka,让我们在POM文件中添加spring-kafka依赖项。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

We’ll also be using a Docker Compose file to configure and test the Kafka server setup. Initially, let’s do this without any SSL configuration:

我们还将使用Docker Compose文件来配置和测试Kafka服务器设置。最初,让我们在没有任何SSL配置的情况下进行。

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:6.2.0
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Now, let’s start the container:

现在,让我们启动容器。

docker-compose up

This should bring up the broker with the default configuration.

这应该会出现具有默认配置的经纪人。

4. Broker Configuration

4.经纪人配置

Let’s start by looking at the minimum configuration required for the broker in order to establish secure connections.

让我们先看一下经纪人为建立安全连接所需的最低配置。

4.1. Standalone Broker

4.1.独立的经纪人

Although we’re not using a standalone instance of the broker in this example, it’s useful to know the configuration changes required in order to enable SSL authentication.

虽然我们在这个例子中没有使用独立的代理实例,但了解启用SSL认证所需的配置变化是很有用的。

First, we need to configure the broker to listen for SSL connections on port 9093, in the server.properties:

首先,我们需要在server.properties配置经纪商以监听9093端口的SSL连接

listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093

Next, the keystore and truststore related properties need to be configured with the certificate locations and credentials:

接下来,钥匙库和信任库的相关属性需要配置证书位置和凭证。

ssl.keystore.location=/certs/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.truststore.location=/certs/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.key.password=password

Finally, the broker must be configured to authenticate clients in order to achieve two-way authentication:

最后,经纪商必须被配置为认证客户,以实现双向认证。

ssl.client.auth=required

4.2. Docker Compose

4.2.Docker Compose

As we’re using Compose to manage our broker environment, let’s add all of the above properties to our docker-compose.yml file:

由于我们使用Compose来管理我们的代理环境,让我们把上述所有属性添加到我们的docker-compose.yml文件。

kafka:
  image: confluentinc/cp-kafka:6.2.0
  depends_on:
    - zookeeper
  ports:
    - 9092:9092
    - 9093:9093
  environment:
    ...
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,SSL://localhost:9093
    KAFKA_SSL_CLIENT_AUTH: 'required'
    KAFKA_SSL_KEYSTORE_FILENAME: '/certs/kafka.server.keystore.jks'
    KAFKA_SSL_KEYSTORE_CREDENTIALS: '/certs/kafka_keystore_credentials'
    KAFKA_SSL_KEY_CREDENTIALS: '/certs/kafka_sslkey_credentials'
    KAFKA_SSL_TRUSTSTORE_FILENAME: '/certs/kafka.server.truststore.jks'
    KAFKA_SSL_TRUSTSTORE_CREDENTIALS: '/certs/kafka_truststore_credentials'
  volumes:
    - ./certs/:/etc/kafka/secrets/certs

Here, we’ve exposed the SSL port (9093) in the ports section of the configuration. Additionally, we’ve mounted the certs project folder in the volumes section of the config. This contains the required certs and the associated credentials.

在这里,我们在配置的ports部分暴露了SSL端口(9093)。此外,我们在配置的volumes部分挂载了certs项目文件夹。这包含了所需的证书和相关凭证。

Now, restarting the stack using Compose shows the relevant SSL details in the broker log:

现在,使用Compose重启堆栈,在经纪人日志中显示了相关的SSL细节。

...
kafka_1      | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka_1      | ===> Configuring ...
<strong>kafka_1      | SSL is enabled.>
....
kafka_1      | [2021-08-20 22:45:10,772] INFO KafkaConfig values:
<strong>kafka_1      |  advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
kafka_1      |  ssl.client.auth = required>
<strong>kafka_1      |  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]>
kafka_1      |  ssl.endpoint.identification.algorithm = https
kafka_1      |  ssl.key.password = [hidden]
kafka_1      |  ssl.keymanager.algorithm = SunX509
<strong>kafka_1      |  ssl.keystore.location = /etc/kafka/secrets/certs/kafka.server.keystore.jks>
kafka_1      |  ssl.keystore.password = [hidden]
kafka_1      |  ssl.keystore.type = JKS
kafka_1      |  ssl.principal.mapping.rules = DEFAULT
<strong>kafka_1      |  ssl.protocol = TLSv1.3>
kafka_1      |  ssl.trustmanager.algorithm = PKIX
kafka_1      |  ssl.truststore.certificates = null
<strong>kafka_1      |  ssl.truststore.location = /etc/kafka/secrets/certs/kafka.server.truststore.jks>
kafka_1      |  ssl.truststore.password = [hidden]
kafka_1      |  ssl.truststore.type = JKS
....

5. Spring Boot Client

5.Spring Boot客户端

Now that the server setup is complete, we’ll create the required Spring Boot components. These will interact with our broker which now requires SSL for two-way authentication.

现在,服务器设置已经完成,我们将创建所需的Spring Boot组件。这些组件将与我们的代理进行交互,现在需要SSL进行双向认证。

5.1. Producer

5.1. 制片人

First, let’s send a message to the specified topic using KafkaTemplate:

首先,让我们使用KafkaTemplate向指定的主题发送一个消息。

public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message, String topic) {
        log.info("Producing message: {}", message);
        kafkaTemplate.send(topic, "key", message)
          .addCallback(
            result -> log.info("Message sent to topic: {}", message),
            ex -> log.error("Failed to send message", ex)
          );
    }
}

The send method is an async operation. Therefore, we’ve attached a simple callback that just logs some information once the broker receives the message.

send方法是一个异步操作。因此,我们附加了一个简单的回调,它只是在经纪人收到消息后记录了一些信息。

5.2. Consumer

5.2.消费者

Next, let’s create a simple consumer using @KafkaListener.  This connects to the broker and consumes messages from the same topic as that used by the producer:

接下来,让我们使用@KafkaListener创建一个简单的消费者。 它连接到代理,并从生产者使用的相同主题中消费消息。

public class KafkaConsumer {

    public static final String TOPIC = "test-topic";

    public final List<String> messages = new ArrayList<>();

    @KafkaListener(topics = TOPIC)
    public void receive(ConsumerRecord<String, String> consumerRecord) {
        log.info("Received payload: '{}'", consumerRecord.toString());
        messages.add(consumerRecord.value());
    }
}

In our demo application, we’ve kept things simple and the consumer simply stores the messages in a List. In an actual real-world system, the consumer receives the messages and processes them according to the application’s business logic.

在我们的演示应用程序中,我们保持简单,消费者只是在List中存储消息。在实际的真实世界系统中,消费者会收到消息,并根据应用程序的业务逻辑来处理它们。

5.3. Configuration

5.3.配置

Finally, let’s add the necessary configuration to our application.yml:

最后,让我们把必要的配置添加到我们的application.yml

spring:
  kafka:
    security:
      protocol: "SSL"
    bootstrap-servers: localhost:9093
    ssl:
      trust-store-location: classpath:/client-certs/kafka.client.truststore.jks
      trust-store-password: <password>
      key-store-location:  classpath:/client-certs/kafka.client.keystore.jks
      key-store-password: <password>
    
    # additional config for producer/consumer 

Here, we’ve set the required properties provided by Spring Boot to configure the producer and consumer. As both of these components are connecting to the same broker, we can declare all the essential properties under spring.kafka section. However, if the producer and consumer were connecting to different brokers, we would specify these under spring.kafka.producer and spring.kafka.consumer sections, respectively.

在这里,我们已经设置了Spring Boot提供的所需属性,以配置生产者和消费者。由于这两个组件都连接到同一个经纪人,我们可以在spring.kafka部分声明所有的基本属性。然而,如果生产者和消费者连接到不同的经纪商,我们将分别在spring.kafka.producerspring.kafka.consumer部分指定这些。

In the ssl section of the configuration, we point to the JKS truststore in order to authenticate the Kafka broker. This contains the certificate of the CA which has also signed the broker certificate. In addition, we’ve also provided the path for the Spring client keystore which contains the certificate signed by the CA that should be present in the truststore on the broker side.

在配置的ssl部分,我们指向JKS truststore,以验证Kafka broker。这包含了CA的证书,它也签署了经纪人的证书。此外,我们还提供了Spring客户端keystore的路径,其中包含由CA签署的证书,该证书应该存在于经纪人一方的信任仓库中。

5.4. Testing

5.4.测试

As we’re using a Compose file, let’s use the Testcontainers framework to create an end-to-end test with our Producer and Consumer:

由于我们使用的是 Compose 文件,让我们使用 Testcontainers 框架来与我们的 ProducerConsumer 创建一个端到端的测试。

@ActiveProfiles("ssl")
@Testcontainers
@SpringBootTest(classes = KafkaSslApplication.class)
class KafkaSslApplicationLiveTest {

    private static final String KAFKA_SERVICE = "kafka";
    private static final int SSL_PORT = 9093;  

    @Container
    public DockerComposeContainer<?> container =
      new DockerComposeContainer<>(KAFKA_COMPOSE_FILE)
        .withExposedService(KAFKA_SERVICE, SSL_PORT, Wait.forListeningPort());

    @Autowired
    private KafkaProducer kafkaProducer;

    @Autowired
    private KafkaConsumer kafkaConsumer;

    @Test
    void givenSslIsConfigured_whenProducerSendsMessageOverSsl_thenConsumerReceivesOverSsl() {
        String message = generateSampleMessage();
        kafkaProducer.sendMessage(message, TOPIC);

        await().atMost(Duration.ofMinutes(2))
          .untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
    }

    private static String generateSampleMessage() {
        return UUID.randomUUID().toString();
    }
}

When we run the test, Testcontainers starts the Kafka broker using our Compose file, including the SSL configuration. The application also starts with its SSL configuration and connects to the broker over an encrypted and authenticated connection. As this is an asynchronous sequence of events, we’ve used Awaitlity to poll for the expected message in the consumer message store. This verifies all the configuration and the successful two-way authentication between the broker and the client.

当我们运行测试时,Testcontainers使用我们的Compose文件启动Kafka代理,包括SSL配置。应用程序也以其SSL配置启动,并通过加密和认证的连接连接到代理。由于这是一个异步的事件序列,我们使用Awaitlity来轮询消费者消息存储中的预期消息。这验证了所有的配置以及经纪人和客户之间成功的双向认证。

6. Conclusion

6.结语

In this article, we’ve covered the basics of the SSL authentication setup required between the Kafka broker and a Spring Boot client.

在这篇文章中,我们已经介绍了Kafka代理和Spring Boot客户端之间所需的SSL认证设置的基本情况。

Initially, we looked at the broker setup required to enable two-way authentication. Then, we looked at the configuration required on the client-side in order to connect to the broker over an encrypted and authenticated connection. Finally, we used an integration test to verify the secure connection between the broker and the client.

最初,我们看了启用双向认证所需的代理设置。然后,我们研究了在客户端所需的配置,以便通过加密和认证的连接来连接到经纪人。最后,我们用一个集成测试来验证经纪人和客户之间的安全连接。

As always, the full source code is available over on GitHub.

一如既往,完整的源代码可在GitHub上获得