Introduction to Debezium – Debezium简介

最后修改: 2021年 4月 26日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

Today’s applications sometimes need a replica database, a search index to perform a search operation, a cache store to speed up data read, and a data warehouse for complex analytics on data.

今天的应用程序有时需要一个复制数据库,一个执行搜索操作的搜索索引,一个加快数据读取速度的缓存存储,以及一个对数据进行复杂分析的数据仓库。

The need to support different data models and data access patterns presents a common problem that most software web developers need to solve, and that’s when Change Data Capture (CDC) comes to the rescue!

需要支持不同的数据模型和数据访问模式,这提出了一个大多数软件网络开发人员需要解决的共同问题,而这正是变化数据捕获(CDC)的救星

In this article, we’ll start with a brief overview of CDC, and we’ll focus on Debezium, a platform commonly used for CDC.

在这篇文章中,我们将从CDC的简要概述开始,我们将重点介绍Debezium,一个常用于CDC的平台

2. What Is a CDC?

2.什么是CDC?

In this section, we’ll see what a CDC is, the key benefits of using it, and some common use cases.

在本节中,我们将看到什么是CDC,使用CDC的主要好处,以及一些常见的使用案例。

2.1. Change Data Capture

2.1.变更数据采集

Change Data Capture (CDC) is a technique and a design pattern. We often use it to replicate data between databases in real-time.

变更数据捕获(CDC)是一种技术和设计模式。我们经常用它来在数据库之间实时复制数据。

We can also track data changes written to a source database and automatically sync target databases. CDC enables incremental loading and eliminates the need for bulk load updating.

我们还可以跟踪写入源数据库的数据变化并自动同步目标数据库。CDC可以实现增量加载,消除了批量加载更新的需要

2.2. Advantages of CDC

2.2.CDC的优势

Most companies today still use batch processing to sync data between their systems. Using batch processing:

今天,大多数公司仍然使用批处理来在其系统之间同步数据。使用批处理。

  • Data is not synced immediately
  • More allocated resources are used for syncing databases
  • Data replication only happens during specified batch periods

However, change data capture offers some advantages:

然而,变化的数据采集提供了一些优势。

  • Constantly tracks changes in the source database
  • Instantly updates the target database
  • Uses stream processing to guarantee instant changes

With CDC, the different databases are continuously synced, and bulk selecting is a thing of the past. Moreover, the cost of transferring data is reduced because CDC transfers only incremental changes.

有了CDC,不同的数据库被持续同步,批量选择已成为过去。此外,传输数据的成本也降低了,因为CDC只传输增量变化。

2.3. Common CDC Use Cases

2.3.常见的CDC用例

There are various use cases that CDC can help us solve, such as data replication by keeping different data sources in sync, updating or invalidating a cache, updating search indexes, data synchronization in microservices, and much more.

CDC可以帮助我们解决各种用例,比如通过保持不同数据源的同步来进行数据复制,更新或废止缓存,更新搜索索引,在微服务中进行数据同步,等等。

Now that we know a little bit about what a CDC can do, let’s see how it’s implemented in one of the well-known open-source tools.

现在我们对CDC的作用有了一些了解,让我们看看它是如何在一个著名的开源工具中实现的。

3. Debezium Platform

3.Debezium平台

In this section, we’ll introduce Debezium, discover its architecture in detail, and see the different ways of deploying it.

在本节中,我们将介绍Debezium,详细了解其架构,并看看部署它的不同方法。

3.1. What Is Debezium?

3.1 什么是Debezium?

Debezium is an open-source platform for CDC built on top of Apache Kafka. Its primary use is to record all row-level changes committed to each source database table in a transaction log. Each application listening to these events can perform needed actions based on incremental data changes.

Debezium是一个建立在Apache Kafka之上的CDC开源平台。它的主要用途是在交易日志中记录提交给每个源数据库表的所有行级更改。侦听这些事件的每个应用程序都可以根据增量数据的变化来执行所需的操作。

Debezium provides a library of connectors, supporting multiple databases like MySQL, MongoDB, PostgreSQL, and others.

Debezium提供了一个连接器库,支持多种数据库,如MySQL、MongoDB、PostgreSQL和其他数据库。

These connectors can monitor and record the database changes and publish them to a streaming service like Kafka.

这些连接器可以监测和记录数据库的变化,并将其发布到Kafka这样的流媒体服务。

Moreover, Debezium monitors even if our applications are down. Upon restart, it will start consuming the events where it left off, so it misses nothing.

此外,Debezium即使在我们的应用程序关闭的情况下也能监控。重新启动后,它将开始消费它所停止的事件,所以它不会错过任何东西。

3.2. Debezium Architecture

3.2 Debezium架构

Deploying Debezium depends on the infrastructure we have, but more commonly, we often use Apache Kafka Connect.

部署Debezium取决于我们的基础设施,但更常见的是,我们经常使用Apache Kafka Connect。

Kafka Connect is a framework that operates as a separate service alongside the Kafka broker. We used it for streaming data between Apache Kafka and other systems.

Kafka Connect是一个框架,作为一个独立的服务与Kafka代理一起运作。我们用它在Apache Kafka和其他系统之间传输数据。

We can also define connectors to transfer data into and out of Kafka.

我们还可以定义连接器,将数据传入和传出Kafka。

The diagram shown below shows the different parts of a change data capture pipeline based on Debezium:

下图显示了基于Debezium的变更数据采集管道的不同部分。

Debezium Platform Architecture

First, on the left, we have a MySQL source database whose data we want to copy and use in a target database like PostgreSQL or any analytics database.

首先,在左边,我们有一个MySQL源数据库,其数据我们想复制并在目标数据库中使用,如PostgreSQL或任何分析数据库。

Second, the Kafka Connect connector parses and interprets the transaction log and writes it to a Kafka topic.

其次,Kafka Connect连接器解析和解释交易日志并将其写入Kafka主题。

Next, Kafka acts as a message broker to reliably transfer the changeset to the target systems.

接下来,Kafka作为一个消息中介,将变化集可靠地传输到目标系统。

Then, on the right, we have Kafka connectors polling Kafka and pushing the changes to the target databases.

然后,在右边,我们有Kafka连接器轮询Kafka并将变化推送到目标数据库。

Debezium utilizes Kafka in its architecture, but it also offers other deployment methods to satisfy our infrastructure needs.

Debezium在其架构中利用了Kafka,但它也提供了其他的部署方法来满足我们的基础设施需求。

We can use it as a standalone server with the Debezium server, or we can embed it into our application code as a library.

我们可以把它作为一个独立的服务器与Debezium服务器一起使用,也可以把它作为一个库嵌入到我们的应用程序代码中。

We’ll see those methods in the following sections.

我们将在以下章节中看到这些方法。

3.3. Debezium Server

3.3 Debezium服务器

Debezium provides a standalone server to capture the source database changes. It’s configured to use one of the Debezium source connectors.

Debezium提供了一个独立的服务器来捕获源数据库的变化。它被配置为使用Debezium的一个源连接器。

Moreover, these connectors send change events to various messaging infrastructures like Amazon Kinesis or Google Cloud Pub/Sub.

此外,这些连接器将变化事件发送到各种消息传递基础设施,如Amazon Kinesis或Google Cloud Pub/Sub。

3.4. Embedded Debezium

3.4 嵌入式Debezium

Kafka Connect offers fault tolerance and scalability when used to deploy Debezium. However, sometimes our applications don’t need that level of reliability, and we want to minimize the cost of our infrastructure.

Kafka Connect在用于部署Debezium时提供了容错和可扩展性。然而,有时我们的应用程序不需要这种级别的可靠性,而且我们想尽量减少基础设施的成本。

Thankfully, we can do this by embedding the Debezium engine within our application. After doing this, we must configure the connectors.

幸运的是,我们可以通过在我们的应用程序中嵌入Debezium引擎来做到这一点。这样做之后,我们必须配置连接器。

4. Setup

4.设置

In this section, we’ll start first with the architecture of our application. Then, we’ll see how to set up our environment and follow some basic steps to integrate Debezium.

在这一节中,我们将首先介绍我们的应用程序的架构。然后,我们将看到如何设置我们的环境并按照一些基本步骤来整合Debezium。

Let’s start by introducing our application.

让我们首先介绍一下我们的应用程序。

4.1. Sample Application’s Architecture

4.1.示例应用程序的结构

To keep our application simple, we’ll create a Spring Boot application for customer management.

为了使我们的应用简单,我们将创建一个用于客户管理的Spring Boot应用。

Our customer model has ID, fullname, and email fields. For the data access layer, we’ll use Spring Data JPA.

我们的客户模型有IDfullnameemail字段。对于数据访问层,我们将使用Spring Data JPA>。

Above all, our application will run the embedded version of Debezium. Let’s visualize this application architecture:

最重要的是,我们的应用程序将运行Debezium的嵌入式版本。让我们来想象一下这个应用程序的架构。

Springboot Debezium Embedded Integration

First, the Debezium Engine will track a customer table’s transaction logs on a source MySQL database (from another system or application).

首先,Debezium引擎将跟踪一个客户表在源MySQL数据库(来自另一个系统或应用程序)的交易日志。

Second, whenever we perform a database operation like Insert/Update/Delete on the customer table, the Debezium connector will call a service method.

第二,每当我们对customer表进行插入/更新/删除等数据库操作时,Debezium连接器将调用一个服务方法。

Finally, based on these events, that method will sync the customer table’s data to a target MySQL database (our application’s primary database).

最后,根据这些事件,该方法将把customer表的数据同步到目标MySQL数据库(我们应用程序的主数据库)。

4.2. Maven Dependencies

4.2.Maven的依赖性

Let’s get started by first adding the required dependencies to our pom.xml:

让我们开始吧,首先将必要的依赖项添加到我们的pom.xml

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>1.4.2.Final</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>1.4.2.Final</version>
</dependency>

Likewise, we add dependencies for each of the Debezium connectors that our application will use.

同样地,我们为我们的应用程序将使用的每一个Debezium连接器添加依赖关系。

In our case, we’ll use the MySQL connector:

在我们的案例中,我们将使用MySQL连接器

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>1.4.2.Final</version>
</dependency>

4.3. Installing Databases

4.3.安装数据库

We can install and configure our databases manually. However, to speed things up, we’ll use a docker-compose file:

我们可以手动安装和配置我们的数据库。然而,为了加快速度,我们将使用一个docker-compose文件。

version: "3.9"
services:
  # Install Source MySQL DB and setup the Customer database
  mysql-1:
    container_name: source-database
    image: mysql
    ports:
      - 3305:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: customerdb

  # Install Target MySQL DB and setup the Customer database
  mysql-2:
    container_name: target-database
    image: mysql
    ports:
      - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: customerdb

This file will run two database instances on different ports.

这个文件将在不同的端口上运行两个数据库实例。

We can run this file using the command docker-compose up -d.

我们可以使用docker-compose up -d命令运行这个文件。

Now, let’s create the customer table by running a SQL script:

现在,让我们通过运行一个SQL脚本来创建customer表。

CREATE TABLE customer
(
    id integer NOT NULL,
    fullname character varying(255),
    email character varying(255),
    CONSTRAINT customer_pkey PRIMARY KEY (id)
);

5. Configuration

5.配置

In this section, we’ll configure the Debezium MySQL Connector and see how to run the Embedded Debezium Engine.

在本节中,我们将配置Debezium MySQL连接器,并看看如何运行嵌入式Debezium引擎。

5.1. Configuring the Debezium Connector

5.1.配置Debezium连接器

To configure our Debezium MySQL Connector, we’ll create a Debezium configuration bean:

为了配置我们的Debezium MySQL连接器,我们将创建一个Debezium配置Bean。

@Bean
public io.debezium.config.Configuration customerConnector() {
    return io.debezium.config.Configuration.create()
        .with("name", "customer-mysql-connector")
        .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", "/tmp/offsets.dat")
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", customerDbHost)
        .with("database.port", customerDbPort)
        .with("database.user", customerDbUsername)
        .with("database.password", customerDbPassword)
        .with("database.dbname", customerDbName)
        .with("database.include.list", customerDbName)
        .with("include.schema.changes", "false")
        .with("database.server.id", "10181")
        .with("database.server.name", "customer-mysql-db-server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", "/tmp/dbhistory.dat")
        .build();
}

Let’s examine this configuration in more detail.

让我们更详细地检查这个配置。

The create method within this bean uses a builder to create a Properties object.

create方法在这个Bean中使用一个构建器来创建一个Properties对象

This builder sets several properties required by the engine regardless of the preferred connector. To track the source MySQL database, we use the class MySqlConnector.

这个构建器设置了几个引擎所需的属性,而不考虑首选连接器。为了跟踪源MySQL数据库,我们使用MySqlConnector类。

When this connector runs, it starts tracking changes from the source and records “offsets” to determine how much data it has processed from the transaction log.

当这个连接器运行时,它开始跟踪来自源的变化,并记录 “偏移量 “以确定它从交易日志中处理了多少数据

There are several ways to save these offsets, but in this example, we’ll use the class FileOffsetBackingStore to store offsets on our local filesystem.

有几种方法来保存这些偏移量,但在这个例子中,我们将使用FileOffsetBackingStore类来存储我们本地文件系统上的偏移量。

The last few parameters of the connector are the MySQL database properties.

连接器的最后几个参数是MySQL数据库属性。

Now that we have a configuration, we can create our engine.

现在我们有了一个配置,我们可以创建我们的引擎。

5.2. Running the Debezium Engine

5.2.运行Debezium引擎

The DebeziumEngine serves as a wrapper around our MySQL connector. Let’s create the engine using the connector configuration:

DebeziumEngine是我们的MySQL连接器的一个封装器。让我们使用连接器的配置来创建引擎。

private DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

public DebeziumListener(Configuration customerConnectorConfiguration, CustomerService customerService) {

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
      .using(customerConnectorConfiguration.asProperties())
      .notifying(this::handleEvent)
      .build();

    this.customerService = customerService;
}

More to this, the engine will call a method for every data change – in our example, the handleChangeEvent.

更为重要的是,引擎将为每一个数据变化调用一个方法–在我们的例子中,是handleChangeEvent

In this method, first, we’ll parse every event based on the format specified when calling create().

在这个方法中,首先,我们将根据调用create()时指定的格式解析每个事件。

Then, we find which operation we had and invoke the CustomerService to perform Create/Update/Delete functions on our target database:

然后,我们找到我们的操作,并调用CustomerService,在我们的目标数据库上执行创建/更新/删除的功能。

private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
    SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
    Struct sourceRecordChangeValue= (Struct) sourceRecord.value();

    if (sourceRecordChangeValue != null) {
        Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

        if(operation != Operation.READ) {
            String record = operation == Operation.DELETE ? BEFORE : AFTER;
            Struct struct = (Struct) sourceRecordChangeValue.get(record);
            Map<String, Object> payload = struct.schema().fields().stream()
              .map(Field::name)
              .filter(fieldName -> struct.get(fieldName) != null)
              .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
              .collect(toMap(Pair::getKey, Pair::getValue));

            this.customerService.replicateData(payload, operation);
        }
    }
}

Now that we have configured a DebeziumEngine object, let’s start it asynchronously using the service executor:

现在我们已经配置了一个DebeziumEngine对象,让我们使用服务执行器异步启动它。

private final Executor executor = Executors.newSingleThreadExecutor();

@PostConstruct
private void start() {
    this.executor.execute(debeziumEngine);
}

@PreDestroy
private void stop() throws IOException {
    if (this.debeziumEngine != null) {
        this.debeziumEngine.close();
    }
}

6. Debezium in Action

6 行动中的Debezium

To see our code in action, let’s make some data changes on the source database’s customer table.

为了看到我们的代码在运行,让我们在源数据库的customer表上做一些数据更改。

6.1. Inserting a Record

6.1.插入一条记录

To add a new record to the customer table, we’ll go to MySQL shell and run:

为了向customer表添加一条新记录,我们将进入MySQL shell并运行。

INSERT INTO customerdb.customer (id, fullname, email) VALUES (1, 'John Doe', 'jd@example.com')

After running this query, we’ll see the corresponding output from our application:

运行这个查询后,我们将看到我们的应用程序的相应输出。

23:57:57.897 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{after=Struct{id=1,fullname=John Doe,email=jd@example.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617746277000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=703,row=0,thread=19},op=c,ts_ms=1617746277422}'
Hibernate: insert into customer (email, fullname, id) values (?, ?, ?)
23:57:58.095 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, email=jd@example.com} with Operation: CREATE

Finally, we check that a new record was inserted into our target database:

最后,我们检查是否有新的记录被插入到我们的目标数据库中。

id  fullname   email
1  John Doe   jd@example.com

6.2. Updating a Record

6.2.更新一个记录

Now, let’s try to update our last inserted customer and check what happens:

现在,让我们尝试更新我们最后插入的客户,并检查会发生什么。

UPDATE customerdb.customer t SET t.email = 'john.doe@example.com' WHERE t.id = 1

After that, we’ll get the same output as we got with insert, except the operation type changes to ‘UPDATE’, and of course, the query that Hibernate uses is an ‘update’ query:

之后,我们会得到与插入相同的输出,只是操作类型变成了 “UPDATE”,当然,Hibernate使用的查询是一个 “更新 “查询。

00:08:57.893 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{before=Struct{id=1,fullname=John Doe,email=jd@example.com},after=Struct{id=1,fullname=John Doe,email=john.doe@example.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617746937000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=1040,row=0,thread=19},op=u,ts_ms=1617746937703}'
Hibernate: update customer set email=?, fullname=? where id=?
00:08:57.938 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, email=john.doe@example.com} with Operation: UPDATE

We can verify that John’s email has been changed in our target database:

我们可以证实,约翰的电子邮件在我们的目标数据库中已被更改。

id  fullname   email
1  John Doe   john.doe@example.com

6.3. Deleting a Record

6.3.删除一条记录

Now, we can delete an entry in the customer table by executing:

现在,我们可以通过执行以下程序删除customer表中的一个条目。

DELETE FROM customerdb.customer WHERE id = 1

Likewise, here we have a change in operation and query again:

同样,在这里,我们的操作和查询又有了变化。

00:12:16.892 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Key = 'Struct{id=1}' value = 'Struct{before=Struct{id=1,fullname=John Doe,email=john.doe@example.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1617747136000,db=customerdb,table=customer,server_id=1,file=binlog.000007,pos=1406,row=0,thread=19},op=d,ts_ms=1617747136640}'
Hibernate: delete from customer where id=?
00:12:16.951 [pool-1-thread-1] INFO  c.b.l.d.listener.DebeziumListener - Updated Data: {fullname=John Doe, id=1, email=john.doe@example.com} with Operation: DELETE

We can verify that the data has been deleted on our target database:

我们可以验证,在我们的目标数据库中,数据已经被删除。

select * from customerdb.customer where id= 1
0 rows retrieved

7. Conclusion

7.结语

In this article, we saw the benefits of CDC and what problems it can solve. We also learned that, without it, we’re left with bulk loading of the data, which is both time-consuming and costly.

在这篇文章中,我们看到了CDC的好处以及它能解决哪些问题。我们还了解到,如果没有它,我们就只能批量加载数据,这既费时又费钱。

We also saw Debezium, an excellent open-source platform that can help us solve CDC use cases with ease.

我们还看到了Debezium,一个优秀的开源平台,可以帮助我们轻松地解决CDC的用例。

As always, the full source code of the article is available over on GitHub.

一如既往,该文章的完整源代码可在GitHub上获得