Introduction to ksqlDB – ksqlDB简介

最后修改: 2021年 8月 20日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

ksqlDB can be described as a real-time event-streaming database built on top of Apache Kafka and Kafka Streams. It combines powerful stream processing with a relational database model using SQL syntax.

ksqlDB可以说是建立在Apache KafkaKafka Streams之上的一个实时事件流数据库。它将强大的流处理与使用SQL语法的关系数据库模型相结合。

In this tutorial, we’ll cover the fundamental concepts of ksqlDB and build a sample application to demonstrate a practical use case.

在本教程中,我们将介绍ksqlDB的基本概念,并建立一个示例应用程序来演示一个实际的使用案例。

2. Overview

2.概述

Since ksqlDB is an event streaming database, streams and tables are its core abstractions. Essentially, these are collections of data that can be transformed and processed in real-time.

由于ksqlDB是一个事件流数据库,流和表是其核心抽象。从本质上讲,这些是可以实时转换和处理的数据集合。

Stream processing enables continuous computations over these unbounded streams of events. We can transform, filter, aggregate, and join the collections to derive new collections or materialized views using SQL. Furthermore, new events continuously update these collections and views to provide real-time data.

流处理能够对这些无界的事件流进行连续计算。我们可以对集合进行转换、过滤、聚合和连接,从而使用SQL得出新的集合或物化视图。此外,新的事件持续更新这些集合和视图,以提供实时数据。

Finally, queries publish the results of the various stream processing operations. ksqlDB queries support both asynchronous real-time application flows and synchronous request/response flows, similar to a traditional database.

最后,查询会发布各种流处理操作的结果。ksqlDB查询既支持异步实时应用流,也支持同步请求/响应流,与传统数据库类似

3. Setup

3.设置

To see ksqlDB in action, we’ll build an event-driven Java application. This will aggregate and query an unbounded stream of readings from various sensor sources.

为了了解ksqlDB的运行情况,我们将建立一个事件驱动的Java应用程序。这将聚合并查询来自各种传感器源的无限制的读数流。

The main use case is to detect situations when the average value of the readings exceeds a specified threshold, within a specific time period. Furthermore, a key requirement is that the application must provide real-time information that could, for example, be used when building a dashboard or warning system.

主要的用例是检测在特定时间段内,当读数的平均值超过指定的阈值的情况。此外,一个关键的要求是,该应用程序必须提供实时信息,例如,可以在建立仪表板或警告系统时使用。

We’ll be using the ksqlDB Java client to interact with the server in order to create tables, aggregate queries, and execute various queries.

我们将使用ksqlDB的Java客户端与服务器进行交互,以便创建表格,聚合查询,并执行各种查询。

3.1. Docker

3.1 Docker

As ksqlDB runs on top of Kafka, we’ll use Docker Compose to run the Kafka components, ksqlDB server, and the ksqlDB CLI client:

由于ksqlDB运行在Kafka之上,我们将使用Docker Compose来运行Kafka组件、ksqlDB服务器以及ksqlDB CLI客户端。

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    hostname: zookeeper
    ...

  broker:
    image: confluentinc/cp-kafka:6.2.0
    hostname: broker
    ...

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.19.0
    hostname: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    healthcheck:
      test: curl -f http://ksqldb-server:8088/ || exit 1
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.19.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

Additionally, we’ll also use this docker-compose.yml file in our Java application to spin up an environment for our integration tests using the Testcontainers framework.

此外,我们还将在我们的Java应用程序中使用这个docker-compose.yml文件,以使用Testcontainers框架为我们的集成测试创建一个环境。

First, let’s bring up the stack by running:

首先,让我们通过运行来调出堆栈。

docker-compose up

Next, let’s connect to the interactive CLI, once all the services have started. This is useful for testing and interacting with the server:

接下来,让我们连接到交互式CLI,一旦所有的服务都启动了。这对于测试和与服务器的互动很有用。

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

We’ll also tell ksqlDB to start all queries from the earliest point in each topic:

我们还将告诉ksqlDB从每个主题中最早的点开始所有的查询。

ksql> SET 'auto.offset.reset' = 'earliest';

3.2. Dependencies

3.2. 依赖性

In this project, we’ll primarily be using the Java client to interact with ksqlDB. More specifically, we’ll be using ksqlDB for Confluent Platform (CP), so we’ll need to add the CP Maven repository to our POM file:

在这个项目中,我们将主要使用Java客户端与ksqlDB交互。更具体地说,我们将把ksqlDB用于Confluent平台(CP),所以我们需要在POM文件中添加CP Maven仓库

<repository>
    <id>confluent</id>
    <name>confluent-repo</name>
    <url>http://packages.confluent.io/maven/</url>
</repository>

Now, let’s add the dependency for the client:

现在,让我们为客户端添加依赖性

<dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-api-client</artifactId>
    <version>6.2.0</version>
</dependency>

4. Real-Time Data Aggregation

4.实时数据汇总

In this section, we’ll see how to create a materialized view that represents the real-time aggregation required by our application.

在这一节中,我们将看到如何创建一个表示我们的应用程序所需的实时聚合的物化视图。

4.1. Creating the Stream

4.1.创建流

In Kafka, a topic stores the collection of events. Similarly, in ksqkDB, a stream represents the events, backed by a Kafka topic.

在Kafka中,一个主题存储了事件的集合。同样,在ksqkDB中,一个流代表事件,由一个Kafka主题支持

Let’s begin by creating our stream to store the incoming sensor data:

让我们从创建我们的流开始,以存储传入的传感器数据。

CREATE STREAM readings (sensor_id VARCHAR KEY, timestamp VARCHAR, reading INT)
  WITH (KAFKA_TOPIC = 'readings',
        VALUE_FORMAT = 'JSON',
        TIMESTAMP = 'timestamp',
        TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss',
        PARTITIONS = 1);

Here, ksqlDB creates the readings topic to store the stream data in JSON format. Since the events represent temporal data, it’s important that each reading contains a timestamp indicating the event time. The timestamp field stores this data in the specified format. This ensures that ksqlDB applies event-time semantics for time-related operations and out-of-order events.

在这里,ksqlDB创建了readings主题来存储JSON格式的流数据。由于事件代表了时间性的数据,所以每个读数都包含一个表明事件时间的时间戳是很重要的。timestamp字段以指定格式存储该数据。这确保了ksqlDB对与时间相关的操作和失序的事件应用事件时间语义。

Next, we’ll create an instance of the Client with ksqlDB server connection details and use this to execute our SQL statement:

接下来,我们将创建一个带有ksqlDB服务器连接细节的Client实例,并使用它来执行我们的SQL语句。

ClientOptions options = ClientOptions.create()
  .setHost(KSQLDB_SERVER_HOST)
  .setPort(KSQLDB_SERVER_PORT);

Client client = Client.create(options);

Map<String, Object> properties = Collections.singletonMap(
  "auto.offset.reset", "earliest"
);

CompletableFuture<ExecuteStatementResult> result = 
  client.executeStatement(CREATE_READINGS_STREAM, properties);

As previously with the CLI, we set the value of the auto.offset.reset property to “earliest“. This ensures that in the absence of a Kafka offset, the query reads the relevant topic from the earliest offset.

和之前的CLI一样,我们将auto.offset.reset属性的值设置为”earliest“。这确保了在没有Kafka偏移量的情况下,查询会从最早的偏移量读取相关的主题。

The executeStatement method is part of the async API provided by the client. It immediately returns a CompletableFuture, before sending any requests to the server. The calling code may then decide to block and wait for completion (by invoking the get or join method) or to perform other non-blocking operations.

executeStatement方法是由客户端提供的异步API的一部分。它立即返回一个CompletableFuture,然后再向服务器发送任何请求。然后,调用代码可以决定阻塞并等待完成(通过调用getjoin方法)或执行其他非阻塞操作。

4.2. Creating the Materialized View

4.2.创建物化视图

Now that we have the underlying event stream, we can derive a new alerts table from the readings stream. This persistent query (or materialized view) runs on the server indefinitely and processes events from the source stream or table.

现在我们有了底层事件流,我们可以从读数流中导出一个新的alerts表。这个持久化查询(或物化视图)无限期地在服务器上运行,并处理来自源流或表的事件

In our case, it should raise an alert when the average reading, per sensor, exceeds a value of 25 over a 30-minute period:

在我们的案例中,当每个传感器的平均读数在30分钟内超过25的数值时,它应该发出警报。

CREATE TABLE alerts AS
  SELECT
    sensor_id,
    TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss', 'UTC') 
      AS start_period,
    TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss', 'UTC') 
      AS end_period,
    AVG(reading) AS average_reading
  FROM readings
  WINDOW TUMBLING (SIZE 30 MINUTES)
  GROUP BY id 
  HAVING AVG(reading) > 25
  EMIT CHANGES;

In this query, we’re aggregating new incoming events in a tumbling window of 30 minutes, per sensor. We’ve also used the TIMESTAMPTOSTRING function to convert the UNIX timestamp into something more readable.

在这个查询中,我们正在汇总每个传感器在30分钟的tumbling窗口中的新传入事件。我们还使用了TIMESTAMPTOSTRING函数来将UNIX时间戳转换为更可读的东西。

Importantly, the materialized view only updates with data when the new event successfully integrates with the aggregation function.

重要的是,只有当新的事件与聚合函数成功整合时,物化视图才会更新数据

As previously, let’s use the client to execute this statement asynchronously and create our materialized view:

如前所述,让我们使用客户端来异步执行这个语句,并创建我们的物化视图。

CompletableFuture<ExecuteStatementResult> result = 
  client.executeStatement(CREATE_ALERTS_TABLE, properties)

Once created, such views update in an incremental manner. This is the key to efficient and highly performant queries for real-time updates.

一旦创建,这种视图就会以增量的方式更新。这是实时更新的高效和高性能查询的关键。

4.3. Inserting Sample Data

4.3.插入样本数据

Before we can run queries, let’s produce some sample events that represent various readings at 10-minute intervals.

在我们可以运行查询之前,让我们产生一些样本事件,代表10分钟间隔的各种读数。

Let’s provide key/value mappings for the stream columns using KsqlObject:

让我们使用KsqlObject为流列提供键/值映射。

List<KsqlObject> rows = Arrays.asList(
  new KsqlObject().put("sensor_id", "sensor-1")
    .put("timestamp", "2021-08-01 09:00:00").put("reading", 22),
  new KsqlObject().put("sensor_id", "sensor-1")
    .put("timestamp", "2021-08-01 09:10:00").put("reading", 20),
  new KsqlObject().put("sensor_id", "sensor-2")
    .put("timestamp", "2021-08-01 10:00:00").put("reading", 26),
  
  // additional rows
);

CompletableFuture<Void> result = CompletableFuture.allOf(
  rows.stream()
    .map(row -> client.insertInto(READINGS_TABLE, row))
    .toArray(CompletableFuture[]::new)
);

Here, we combine all the individual insert operations into a single Future for convenience. This completes upon the successful completion of all underlying CompletableFuture instances.

在这里,为了方便起见,我们将所有单独的插入操作合并为一个Future。这在所有底层的CompletableFuture实例成功完成后完成。

5. Querying the Data

5.查询数据

Queries allow the callers to bring the materialized view data into the application. These can be classified into two types.

查询允许调用者将物化视图数据引入应用程序。这些可以分为两种类型。

5.1. Push Query

5.1.推送查询

This type of query pushes a continuous stream of updates to the client. These queries are particularly suitable for asynchronous application flows as they enable the clients to react to new information in real-time.

这种类型的查询将连续的更新流推送给客户端。这些查询特别适合异步应用流,因为它们使客户端能够对新信息做出实时反应。

However, unlike persistent queries, the server does not store the results of such queries in a Kafka topic. Therefore, we should keep these queries as simple as possible while moving all the heavy lifting into persistent queries.

然而,与持久性查询不同的是,服务器并不将这类查询的结果存储在Kafka主题中。因此,我们应该保持这些查询尽可能简单,同时将所有繁重的工作转移到持久性查询中

Let’s create a simple push query to subscribe to the results from our alerts materialized view, created earlier:

让我们创建一个简单的推送查询,以订阅我们之前创建的alerts物化视图的结果。

SELECT * FROM alerts EMIT CHANGES;

Here, it’s important to note the EMIT clause, which emits all changes to the client. As the query contains no limit, it will continue to stream all results until terminated.

在这里,重要的是要注意EMIT子句,它将所有的变化发射到客户端。由于该查询不包含任何限制,它将继续流传所有的结果,直到终止。

Next, we subscribe to the results of the query in order to receive streaming data:

接下来,我们订阅查询的结果,以便接收流式数据。

public CompletableFuture<Void> subscribeOnAlerts(Subscriber<Row> subscriber) {
    return client.streamQuery(ALERTS_QUERY, PROPERTIES)
      .thenAccept(streamedQueryResult -> streamedQueryResult.subscribe(subscriber))
      .whenComplete((result, ex) -> {
          if (ex != null) {
              log.error("Alerts push query failed", ex);
          }
      });
}

Here, we’ve invoked the streamQuery method, which returns a StreamedQueryResult for obtaining streaming data. This extends the Publisher interface from Reactive Streams. Therefore, we’re able to asynchronously consume the results by using a reactive Subscriber. In fact, the subscriber is a simple Reactive Streams implementation that receives the ksqlDB rows as JSON and converts them to Alert POJO.

在这里,我们调用了streamQuery方法,该方法返回一个StreamedQueryResult,用于获取流数据。这扩展了Reactive Streams中的Publisher接口。因此,我们能够通过使用反应式Subscriber来异步地消费结果。事实上,订阅者是一个简单的反应式流的实现,它接收作为 JSON 的 ksqlDB 行并将其转换为 Alert POJO。

We can now test this using our Compose file and the DockerComposeContainer from Testcontainers:

现在我们可以使用我们的Compose文件和DockerComposeContainer来测试。

@Testcontainers
class KsqlDBApplicationLiveTest {

    @Container
    public static DockerComposeContainer dockerComposeContainer =
      new DockerComposeContainer<>(KSQLDB_COMPOSE_FILE)
        .withServices("zookeeper", "broker", "ksqldb-server")
        .withExposedService("ksqldb-server", 8088,
          Wait.forHealthcheck().withStartupTimeout(Duration.ofMinutes(5)))
        .withLocalCompose(true);

    // setup and teardown

    @Test
    void givenSensorReadings_whenSubscribedToAlerts_thenAlertsAreConsumed() {
        createAlertsMaterializedView();
        
        // Reactive Streams Subscriber impl for receiving streaming data
        RowSubscriber<Alert> alertSubscriber = new RowSubscriber<>(Alert.class);

        ksqlDBApplication.subscribeOnAlerts(alertSubscriber);
        insertSampleData();

        await().atMost(Duration.ofMinutes(3)).untilAsserted(() ->
          assertThat(alertSubscriber.consumedItems)
            .containsOnly(
              expectedAlert("sensor-1", "2021-08-01 09:30:00", "2021-08-01 10:00:00", 28.0),
              expectedAlert("sensor-2", "2021-08-01 10:00:00", "2021-08-01 10:30:00", 26.0)
            )
        );
    }
}

Here, we’ve spun up a complete ksqlDB environment for the integration tests. The test inserts sample rows into the stream and ksqlDB performs the windowed aggregation. Finally, we assert that our subscriber consumes the latest alerts, as expected.

在这里,我们已经为集成测试启动了一个完整的ksqlDB环境。测试将样本行插入流中,ksqlDB执行窗口聚合。最后,我们断言,我们的订阅者如愿以偿地消费了最新的警报。

5.2. Pull Query

5.2.拉动查询

In contrast to push queries, pull queries retrieve data that does not update dynamically, much like a traditional RDBMS. Such queries return immediately with a finite result set. Hence, pull queries are well suited to synchronous request/response application flows.

与推式查询相反,拉式查询检索的是不动态更新的数据,很像传统的RDBMS。这种查询会立即返回一个有限的结果集。因此,拉式查询非常适用于同步请求/响应应用流

As a simple example, let’s create a query to retrieve all the alerts triggered for a particular sensor id:

作为一个简单的例子,让我们创建一个查询,以检索为一个特定的传感器ID触发的所有警报。

String pullQuery = "SELECT * FROM alerts WHERE sensor_id = 'sensor-2';";

List<Row> rows = client.executeQuery(pullQuery, PROPERTIES).get()

In contrast to the push query, this query returns all the available data from the materialized view at execution time. This is useful for querying the current state of the materialized view.

与推送查询不同的是,该查询在执行时返回物化视图的所有可用数据。这对于查询物化视图的当前状态很有用。

5.3. Miscellaneous Operations

5.3.杂项业务

The API docs for the client provide further information on other operations such as describing sources; listing streams, tables, topics; terminating queries, and more.

客户端的API文档提供了关于其他操作的进一步信息,如描述源;列出流、表、主题;终止查询等等。

6. Conclusion

6.结论

In this article, we covered the core concepts of streams, tables, and queries that support ksqlDB as an efficient event-streaming database.

在这篇文章中,我们介绍了流、表和查询的核心概念,这些概念支持ksqlDB成为一个高效的事件流数据库。

Along the way, we built a simple, reactive application using concise and composable SQL constructs. We also saw how to use the Java client to create streams and tables and issue queries against materialized views and retrieve real-time data.

在这一过程中,我们使用简明和可组合的SQL结构建立了一个简单的、反应式的应用程序。我们还看到了如何使用Java客户端来创建流和表,并对物化视图发出查询和检索实时数据。

As always, the full source code is available over on GitHub.

一如既往,完整的源代码可在GitHub上获得