Kafka Streams With Spring Boot – 使用Spring Boot的Kafka流

最后修改: 2021年 12月 11日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

In this article, we’ll see how to set up Kafka Streams using Spring Boot. Kafka Streams is a client-side library built on top of Apache Kafka. It enables the processing of an unbounded stream of events in a declarative manner.

在本文中,我们将看到如何使用 Spring Boot 设置 Kafka Streams。Kafka Streams是一个建立在Apache Kafka之上的客户端库。 它能够以声明的方式处理无界限的事件流。

Some real-life examples of streaming data could be sensor data, stock market event streams, and system logs. For this tutorial, we’ll build a simple word-count streaming application. Let’s start with an overview of Kafka Streams and then set up the example along with its tests in Spring Boot.

一些现实生活中的流媒体数据例子可以是传感器数据、股市事件流和系统日志。在本教程中,我们将建立一个简单的字数流应用。让我们先了解一下Kafka流的概况,然后在Spring Boot中设置这个例子及其测试。

2. Overview

2.概述

Kafka Streams provides a duality between Kafka topics and relational database tables. It enables us to do operations like joins, grouping, aggregation, and filtering of one or more streaming events.

Kafka Streams提供了Kafka主题和关系数据库表之间的双重性。它使我们能够对一个或多个流事件进行连接、分组、聚合和过滤等操作。

An important concept of Kafka Streams is that of processor topology. Processor topology is the blueprint of Kafka Stream operations on one or more event streams. Essentially, the processor topology can be considered as a directed acyclic graph. In this graph, nodes are categorized into source, processor, and sink nodes, whereas the edges represent the flow of the stream events.

Kafka流的一个重要概念是处理器拓扑结构。处理器拓扑结构是Kafka流在一个或多个事件流上操作的蓝图。从本质上讲,处理器拓扑结构可以被视为一个定向非循环图。在这个图中,节点被分为源节点、处理器节点和汇节点,而边则代表流事件的流动。

The source at the top of the topology receives streaming data from Kafka, passes it down to the processor nodes where custom operations are performed, and flows out through the sink nodes to a new Kafka topic. Alongside the core processing, the state of the stream is saved periodically using checkpoints for fault tolerance and resilience.

位于拓扑结构顶端的源从Kafka接收流式数据,将其向下传递到执行自定义操作的处理器节点,并通过汇节点流向新的Kafka主题。在核心处理的同时,使用检查点定期保存流的状态,以实现容错和弹性。

3. Dependencies

3.依赖性

We’ll start by adding the spring-kafka and kafka-streams dependencies to our POM:

我们将首先把spring-kafkakafka-streams依赖性添加到我们的POM中。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.8</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId
    <artifactId>kafka-streams</artifactId>
    <version>2.7.1</version>
</dependency> 

4. Example

4.例子

Our sample application reads streaming events from an input Kafka topic. Once the records are read, it processes them to split the text and counts the individual words. Subsequently, it sends the updated word count to the Kafka output. In addition to the output topic, we’ll also create a simple REST service to expose this count over an HTTP endpoint.

我们的示例应用程序从一个输入的Kafka主题中读取流式事件。一旦记录被读取,它就会对其进行处理,以分割文本,并对单个单词进行计数。随后,它将更新的字数发送到Kafka输出。除了输出主题外,我们还将创建一个简单的REST服务,通过HTTP端点公开这个计数。

Overall, the output topic will be continuously updated with the words extracted from the input events and their updated counts.

总的来说,输出主题将随着从输入事件中提取的词和它们的最新计数而不断地更新。

4.1. Configuration

4.1.配置

First, let’s define the Kafka stream configuration in a Java config class:

首先,让我们在一个Java config类中定义Kafka流的配置。

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration kStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG, "streams-app");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        return new KafkaStreamsConfiguration(props);
    }

    // other config
}

Here, we’ve used the@EnableKafkaStreams annotation to autoconfigure the required components. This autoconfiguration requires a KafkaStreamsConfiguration bean with the name as specified by DEFAULT_STREAMS_CONFIG_BEAN_NAME. As a result, Spring Boot uses this configuration and creates a KafkaStreams client to manage our application lifecycle.

在这里,我们使用了@EnableKafkaStreams注解来自动配置所需组件。这种自动配置需要一个KafkaStreamsConfiguration bean,其名称由DEFAULT_STREAMS_CONFIG_BEAN_NAME指定。因此,Spring Boot使用该配置并创建一个KafkaStreams客户端来管理我们的应用程序生命周期

In our example, we’ve provided the application id, bootstrap server connection details, and SerDes (Serializer/Deserializer) for our configuration.

在我们的例子中,我们为配置提供了应用程序ID、引导服务器连接细节和SerDes(串行器/解串器)。

4.2. Topology

4.2.拓扑结构

Now that we’ve set up the configuration, let’s build the topology for our application to keep a count of the words from input messages:

现在我们已经设置了配置,让我们为我们的应用程序建立拓扑结构,以保持输入信息的字数。

@Component
public class WordCountProcessor {

    private static final Serde<String> STRING_SERDE = Serdes.String();

    @Autowired
    void buildPipeline(StreamsBuilder streamsBuilder) {
        KStream<String, String> messageStream = streamsBuilder
          .stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));

        KTable<String, Long> wordCounts = messageStream
          .mapValues((ValueMapper<String, String>) String::toLowerCase)
          .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
          .groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
          .count();

        wordCounts.toStream().to("output-topic");
    }
}

Here, we’ve defined a configuration method and annotated it with @Autowired. Spring processes this annotation and wires a matching bean from the container into the StreamsBuilder argument. Alternately, we can also create a bean in the configuration class to generate the topology.

在这里,我们定义了一个配置方法,并用@Autowired对其进行了注解。Spring处理这个注解,并将容器中的匹配Bean接入StreamsBuilder参数。另外,我们也可以在配置类中创建一个Bean来生成拓扑结构。

StreamsBuilder gives us access to all of the Kafka Streams APIs, and it becomes like a regular Kafka Streams application. In our example, we’ve used this high-level DSL to define the transformations for our application:

StreamsBuilder让我们能够访问所有的Kafka流API,它就像一个普通的Kafka流应用程序。在我们的例子中,我们使用这个高级DSL来定义我们应用程序的转换

  • Create a KStream from the input topic using the specified key and value SerDes.
  • Create a KTable by transforming, splitting, grouping, and then counting the data.
  • Materialize the result to an output stream.

In essence, Spring Boot provides a very thin wrapper around Streams API while managing the lifecycle of our KStream instance. It creates and configures the required components for the topology and executes our Streams application. Importantly, this lets us focus on our core business logic while Spring manages the lifecycle.

实质上,Spring Boot在管理我们的KStream实例的生命周期的同时,为Streams API提供了一个非常薄的包装。它为拓扑结构创建和配置所需的组件,并执行我们的Streams应用程序。重要的是,这让我们专注于我们的核心业务逻辑,而Spring则负责管理生命周期。

4.3. REST Service

4.3.REST服务

After defining our pipeline with the declarative steps, let’s create the REST controller. This provides the endpoints in order to POST messages to the input topic and to GET the counts for the specified word. But importantly, the application retrieves data from the Kafka Streams state store rather than the output topic.

在用声明性步骤定义了我们的管道后,让我们创建REST控制器。这提供了端点,以便向输入主题POST消息,并获取指定单词的计数。但重要的是,应用程序从Kafka Streams状态存储而非输出主题中检索数据

First, let’s modify the KTable from earlier and materialize the aggregated counts as a local state store. This can then be queried from the REST controller:

首先,让我们修改前面的KTable,并将聚集的计数物化为一个本地状态存储。这样就可以从REST控制器中进行查询。

KTable<String, Long> wordCounts = textStream
  .mapValues((ValueMapper<String, String>) String::toLowerCase)
  .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
  .groupBy((key, value) -> value, Grouped.with(STRING_SERDE, STRING_SERDE))
  .count(Materialized.as("counts"));

After this, we can update our controller to retrieve the count value from this counts state store:

在这之后,我们可以更新我们的控制器,从这个counts状态存储中检索计数值。

@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
    KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
    ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams.store(
      StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore())
    );
    return counts.get(word);
}

Here, the factoryBean is an instance of StreamsBuilderFactoryBean that is wired into the controller. This provides the KafkaStreams instance managed by this factory bean. Therefore, we can obtain the counts key/value state store that we created earlier, represented by KTable. At this point, we can use this to get the current count for the requested word from the local state store.

这里,factoryBeanStreamsBuilderFactoryBean的一个实例,它被连接到控制器。这提供了由这个工厂Bean管理的KafkaStreams实例。因此,我们可以获得我们之前创建的counts键/值状态存储,由KTable表示。这时,我们可以用它来从本地状态存储中获取所请求的字的当前计数。

5. Testing

5.测试

Testing is a crucial part of developing and verifying our application topology. Spring Kafka test library and Testcontainers both provide excellent support to test our application at various levels.

测试是开发和验证我们应用程序拓扑结构的关键部分。Spring Kafka测试库Testcontainers都为在不同层次上测试我们的应用程序提供了出色的支持。

5.1. Unit Testing

5.1.单元测试

First, let’s set up a unit test for our topology using the TopologyTestDriver. This is the main test tool for testing a Kafka Streams application:

首先,让我们使用TopologyTestDriver为我们的拓扑结构设置一个单元测试。这是测试Kafka流应用程序的主要测试工具。

@Test
void givenInputMessages_whenProcessed_thenWordCountIsProduced() {
    StreamsBuilder streamsBuilder = new StreamsBuilder();
    wordCountProcessor.buildPipeline(streamsBuilder);
    Topology topology = streamsBuilder.build();

    try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
        TestInputTopic<String, String> inputTopic = topologyTestDriver
          .createInputTopic("input-topic", new StringSerializer(), new StringSerializer());
        
        TestOutputTopic<String, Long> outputTopic = topologyTestDriver
          .createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer());

        inputTopic.pipeInput("key", "hello world");
        inputTopic.pipeInput("key2", "hello");

        assertThat(outputTopic.readKeyValuesToList())
          .containsExactly(
            KeyValue.pair("hello", 1L),
            KeyValue.pair("world", 1L),
            KeyValue.pair("hello", 2L)
          );
    }
}

Here, the first thing we need is the Topology that encapsulates our business logic under test from the WordCountProcessor. Now, we can use this with the TopologyTestDriver to create the input and output topics for our testing. Crucially, this eliminates the need to have a broker running and still verify the pipeline behavior. In other words, it makes it fast and easy to verify our pipeline behavior without using a real Kafka broker.

在这里,我们首先需要的是Topology,它封装了我们来自WordCountProcessor的被测业务逻辑。现在,我们可以将其与TopologyTestDriver一起使用,为我们的测试创建输入和输出主题。至关重要的是,这消除了对运行经纪商的需要,并且仍然验证管道行为。换句话说,它使我们在不使用真正的Kafka代理的情况下快速而容易地验证管道行为。

5.2. Integration Testing

5.2.集成测试

Finally, let’s use the Testcontainers framework to test our application end-to-end. This uses a running Kafka broker and starts up our application for a complete test:

最后,让我们使用Testcontainers框架来测试我们的应用程序端到端。这使用了一个正在运行的Kafka代理,并启动了我们的应用程序进行完整的测试。

@Testcontainers
@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class KafkaStreamsApplicationLiveTest {

    @Container
    private static final KafkaContainer KAFKA = new KafkaContainer(
      DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    private final BlockingQueue<String> output = new LinkedBlockingQueue<>();

    // other test setup

    @Test
    void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception {
        postMessage("test message");

        startOutputTopicConsumer();

        // assert correct counts on output topic
        assertThat(output.poll(2, MINUTES)).isEqualTo("test:1");
        assertThat(output.poll(2, MINUTES)).isEqualTo("message:1");

        // assert correct count from REST service
        assertThat(getCountFromRestServiceFor("test")).isEqualTo(1);
        assertThat(getCountFromRestServiceFor("message")).isEqualTo(1);
    }
}

Here, we’ve sent a POST to our REST controller, which, in turn, sends the message to the Kafka input topic. As part of the setup, we’ve also started a Kafka consumer. This listens asynchronously to the output Kafka topic and updates the BlockingQueue with the received word counts.

在这里,我们发送了一个POST到我们的REST控制器,反过来,它把消息发送到Kafka输入主题。作为设置的一部分,我们还启动了一个Kafka消费者。它异步监听输出的Kafka主题,用收到的字数更新BlockingQueue

During the test execution, the application should process the input messages. Following on, we can verify the expected output both from the topic as well as the state store using the REST service.

在测试执行期间,应用程序应该处理输入的消息。随后,我们可以使用REST服务来验证主题和状态存储的预期输出。

6. Conclusion

6.结语

In this tutorial, we’ve seen how to create a simple event-driven application to process messages with Kafka Streams and Spring Boot.

在本教程中,我们看到了如何创建一个简单的事件驱动的应用程序,用Kafka流和Spring Boot处理消息。

After a brief overview of core streaming concepts, we looked at the configuration and creation of a Streams topology. Then, we saw how to integrate this with the REST functionality provided by Spring Boot. Finally, we covered some approaches for effectively testing and verifying our topology and application behavior.

在简要介绍了核心流媒体概念后,我们研究了配置和创建流媒体拓扑结构的问题。然后,我们看到了如何将其与Spring Boot提供的REST功能相结合。最后,我们介绍了一些有效测试和验证我们的拓扑结构和应用行为的方法。

As always, the full source code is available over on GitHub.

一如既往,完整的源代码可在GitHub上获得