Introduction to Kafka Connectors – Kafka连接器的介绍

最后修改: 2018年 11月 17日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Apache Kafka® is a distributed streaming platform. In a previous tutorial, we discussed how to implement Kafka consumers and producers using Spring.

Apache Kafka®是一个分布式流媒体平台。在之前的教程中,我们讨论了如何使用Spring实现Kafka消费者和生产者

In this tutorial, we’ll learn how to use Kafka Connectors.

在本教程中,我们将学习如何使用Kafka连接器。

We’ll have a look at:

我们将看看:

  • Different types of Kafka Connectors
  • Features and modes of Kafka Connect
  • Connectors configuration using property files as well as the REST API

2. Basics of Kafka Connect and Kafka Connectors

2.Kafka Connect和Kafka连接器的基础知识

Kafka Connect is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems, using so-called Connectors.

Kafka Connect是一个框架,用于连接Kafka与外部系统,如数据库、键值存储、搜索索引和文件系统,使用所谓的Connectors

Kafka Connectors are ready-to-use components, which can help us to import data from external systems into Kafka topics and export data from Kafka topics into external systemsWe can use existing connector implementations for common data sources and sinks or implement our own connectors.

Kafka连接器是即用型组件,可以帮助我们从外部系统导入数据到Kafka主题,以及从Kafka主题导出数据到外部系统我们可以使用现有的连接器实现常见的数据源和汇,或者实现我们自己的连接器。

A source connector collects data from a system. Source systems can be entire databases, streams tables, or message brokers. A source connector could also collect metrics from application servers into Kafka topics, making the data available for stream processing with low latency.

一个源连接器从一个系统收集数据。源系统可以是整个数据库、流表或消息代理。源连接器也可以从应用服务器收集指标到Kafka主题,使数据可用于低延迟的流处理。

A sink connector delivers data from Kafka topics into other systems, which might be indexes such as Elasticsearch, batch systems such as Hadoop, or any kind of database.

一个sink连接器将数据从Kafka主题传递到其他系统,这些系统可能是Elasticsearch等索引,Hadoop等批处理系统,或任何类型的数据库。

Some connectors are maintained by the community, while others are supported by Confluent or its partners. Really, we can find connectors for most popular systems, like S3, JDBC, and Cassandra, just to name a few.

一些连接器是由社区维护的,而另一些是由Confluent或其合作伙伴支持的。实际上,我们可以找到大多数流行系统的连接器,如S3、JDBC和Cassandra,仅举几例。

3. Features

3.特点

Kafka Connect features include:

Kafka Connect的功能包括。

  • A framework for connecting external systems with Kafka – it simplifies the development, deployment, and management of connectors
  • Distributed and standalone modes – it helps us to deploy large clusters by leveraging the distributed nature of Kafka, as well as setups for development, testing, and small production deployments
  • REST interface – we can manage connectors using a REST API
  • Automatic offset management – Kafka Connect helps us to handle the offset commit process, which saves us the trouble of implementing this error-prone part of connector development manually
  • Distributed and scalable by default – Kafka Connect uses the existing group management protocol; we can add more workers to scale up a Kafka Connect cluster
  • Streaming and batch integration – Kafka Connect is an ideal solution for bridging streaming and batch data systems in connection with Kafka’s existing capabilities
  • Transformations – these enable us to make simple and lightweight modifications to individual messages

4. Setup

4.设置

Instead of using the plain Kafka distribution, we’ll download Confluent Platform, a Kafka distribution provided by Confluent, Inc., the company behind Kafka. Confluent Platform comes with some additional tools and clients, compared to plain Kafka, as well as some additional pre-built Connectors.

我们将下载Confluent Platform,这是一个由Kafka背后的公司Confluent, Inc.提供的Kafka分布,而不是使用普通的Kafka分布。与普通Kafka相比,Confluent Platform配备了一些额外的工具和客户端,以及一些额外的预建连接器。

For our case, the Open Source edition is sufficient, which can be found at Confluent’s site.

对于我们的情况,开源版就足够了,可以在Confluent的网站上找到。

5. Quick Start Kafka Connect

5.快速启动Kafka连接

For starters, we’ll discuss the principle of Kafka Connect, using its most basic Connectors, which are the file source connector and the file sink connector.

首先,我们将讨论Kafka Connect的原理,使用其最基本的连接器,也就是文件source连接器和文件sink连接器

Conveniently, Confluent Platform comes with both of these connectors, as well as reference configurations.

方便的是,Confluent平台自带这两种连接器以及参考配置。

5.1. Source Connector Configuration

5.1.源连接器配置

For the source connector, the reference configuration is available at $CONFLUENT_HOME/etc/kafka/connect-file-source.properties:

对于源连接器,参考配置可在$CONFLUENT_HOME/etc/kafka/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt

This configuration has some properties that are common for all source connectors:

这种配置有一些属性,对所有源连接器来说都是通用的。

  • name is a user-specified name for the connector instance
  • connector.class specifies the implementing class, basically the kind of connector
  • tasks.max specifies how many instances of our source connector should run in parallel, and
  • topic defines the topic to which the connector should send the output

In this case, we also have a connector-specific attribute:

在这种情况下,我们也有一个连接器特定的属性:

  • file defines the file from which the connector should read the input

For this to work then, let’s create a basic file with some content:

为了使其发挥作用,让我们创建一个有一些内容的基本文件。

echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt

Note that the working directory is $CONFLUENT_HOME.

注意,工作目录是$CONFLUENT_HOME.

5.2. Sink Connector Configuration

5.2.水槽连接器配置

For our sink connector, we’ll use the reference configuration at $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties:

对于我们的sink连接器,我们将使用$CONFLUENT_HOME/etc/kafka/connect-file-sink.properties的参考配置。

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

Logically, it contains exactly the same parameters, though this time connector.class specifies the sink connector implementation, and file is the location where the connector should write the content.

从逻辑上讲,它包含完全相同的参数,尽管这次connector.class指定了水槽连接器的实现,而file是连接器应该写入内容的位置。

5.3. Worker Config

5.3.工作者配置

Finally, we have to configure the Connect worker, which will integrate our two connectors and do the work of reading from the source connector and writing to the sink connector.

最后,我们必须配置Connect worker,它将整合我们的两个连接器,并完成从源连接器读取和向汇连接器写入的工作。

For that, we can use $CONFLUENT_HOME/etc/kafka/connect-standalone.properties:

为此,我们可以使用$CONFLUENT_HOME/etc/kafka/connect-standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/share/java

Note that plugin.path can hold a list of paths, where connector implementations are available

请注意,plugin.path可以保存一个路径列表,其中有连接器的实现。

As we’ll use connectors bundled with Kafka, we can set plugin.path to $CONFLUENT_HOME/share/java. Working with Windows, it might be necessary to provide an absolute path here.

由于我们将使用与Kafka捆绑的连接器,我们可以将plugin.path设置为$CONFLUENT_HOME/share/java。在Windows下工作,可能有必要在这里提供一个绝对路径。

For the other parameters, we can leave the default values:

对于其他参数,我们可以保留默认值。

  • bootstrap.servers contains the addresses of the Kafka brokers
  • key.converter and value.converter define converter classes, which serialize and deserialize the data as it flows from the source into Kafka and then from Kafka to the sink
  • key.converter.schemas.enable and value.converter.schemas.enable are converter-specific settings
  • offset.storage.file.filename is the most important setting when running Connect in standalone mode: it defines where Connect should store its offset data
  • offset.flush.interval.ms defines the interval at which the worker tries to commit offsets for tasks

And the list of parameters is quite mature, so check out the official documentation for a complete list.

而且参数列表相当成熟,请查看官方文档以获得完整的列表。

5.4. Kafka Connect in Standalone Mode

5.4.独立模式下的Kafka连接

And with that, we can start our first connector setup:

就这样,我们可以开始我们的第一个连接器设置。

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

First off, we can inspect the content of the topic using the command line:

首先,我们可以用命令行检查主题的内容。

$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning

As we can see, the source connector took the data from the test.txt file, transformed it into JSON, and sent it to Kafka:

我们可以看到,源连接器从test.txt文件中获取数据,将其转换为JSON,并将其发送到Kafka。

{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

And, if we have a look at the folder $CONFLUENT_HOME, we can see that a file test.sink.txt was created here:

而且,如果我们看一下$CONFLUENT_HOME文件夹,我们可以看到这里创建了一个文件test.sink.txt

cat $CONFLUENT_HOME/test.sink.txt
foo
bar

As the sink connector extracts the value from the payload attribute and writes it to the destination file, the data in test.sink.txt has the content of the original test.txt file.

当汇流排连接器从payload属性中提取数值并将其写入目标文件时,test.sink.txt中的数据具有原始test.txt文件的内容。

Now let’s add more lines to test.txt.

现在让我们在test.txt.中添加更多行。

When we do, we see that the source connector detects these changes automatically.

当我们这样做时,我们看到源连接器会自动检测这些变化。

We only have to make sure to insert a newline at the end, otherwise, the source connector won’t consider the last line.

我们只需确保在结尾处插入一个换行,否则,源连接器将不考虑最后一行。

At this point, let’s stop the Connect process, as we’ll start Connect in distributed mode in a few lines.

在这一点上,让我们停止Connect进程,因为我们将在几行中以distributed模式启动Connect。

6. Connect’s REST API

6.Connect的REST API

Until now, we made all configurations by passing property files via the command line. However, as Connect is designed to run as a service, there is also a REST API available.

到目前为止,我们通过命令行传递属性文件来进行所有配置。然而,由于Connect被设计为作为一项服务运行,因此也有一个REST API可用。

By default, it is available at http://localhost:8083. A few endpoints are:

默认情况下,它可以在http://localhost:8083。有几个端点是。

  • GET /connectors – returns a list with all connectors in use
  • GET /connectors/{name} – returns details about a specific connector
  • POST /connectors – creates a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
  • GET /connectors/{name}/status – returns the current status of the connector – including if it is running, failed or paused – which worker it is assigned to, error information if it has failed, and the state of all its tasks
  • DELETE /connectors/{name} – deletes a connector, gracefully stopping all tasks and deleting its configuration
  • GET /connector-plugins – returns a list of connector plugins installed in the Kafka Connect cluster

The official documentation provides a list with all endpoints.

官方文档提供了一个包含所有端点的列表。

We’ll use the REST API for creating new connectors in the following section.

我们将在下一节使用REST API来创建新的连接器。

7. Kafka Connect in Distributed Mode

7.分布式模式下的Kafka连接

The standalone mode works perfectly for development and testing, as well as smaller setups. However, if we want to make full use of the distributed nature of Kafka, we have to launch Connect in distributed mode.

独立模式对于开发和测试以及较小的设置来说非常有效。然而,如果我们想充分利用Kafka的分布式特性,我们必须在分布式模式下启动Connect。

By doing so, connector settings and metadata are stored in Kafka topics instead of the file system. As a result, the worker nodes are really stateless.

通过这样做,连接器的设置和元数据被存储在Kafka主题而不是文件系统中。因此,工作节点实际上是无状态的。

7.1. Starting Connect 

7.1.启动连接

A reference configuration for distributed mode can be found at $CONFLUENT_HOME/etc/kafka/connect-distributed.properties.

分布式模式的参考配置可以在$CONFLUENT_HOME/etc/kafka/connect-distributed.properties.找到。

Parameters are mostly the same as for standalone mode. There are only a few differences:

参数大多与独立模式相同。只有几个区别:

  • group.id defines the name of the Connect cluster group. The value must be different from any consumer group ID
  • offset.storage.topicconfig.storage.topic and status.storage.topic define topics for these settings. For each topic, we can also define a replication factor

Again, the official documentation provides a list with all parameters.

同样,官方文档提供了一个包含所有参数的列表。

We can start Connect in distributed mode as follows:

我们可以在分布式模式下启动Connect,方法如下。

$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

7.2. Adding Connectors Using the REST API

7.2.使用REST API添加连接器

Now, compared to the standalone startup command, we didn’t pass any connector configurations as arguments. Instead, we have to create the connectors using the REST API.

现在,与独立的启动命令相比,我们没有把任何连接器的配置作为参数传递。相反,我们必须使用REST API来创建连接器。

To set up our example from before, we have to send two POST requests to http://localhost:8083/connectors containing the following JSON structs.

为了设置我们之前的例子,我们必须向http://localhost:8083/connectors发送两个POST请求,包含以下JSON结构。

First, we need to create the body for the source connector POST as a JSON file. Here, we’ll call it connect-file-source.json:

首先,我们需要为源连接器POST创建一个JSON文件的主体。在这里,我们将其称为connect-file-source.json

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-distributed.txt",
        "topic": "connect-distributed"
    }
}

Note how this looks pretty similar to the reference configuration file we used the first time.

请注意,这看起来与我们第一次使用的参考配置文件非常相似。

And then we POST it:

然后我们把它寄出去。

curl -d @"$CONFLUENT_HOME/connect-file-source.json" \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

Then, we’ll do the same for the sink connector, calling the file connect-file-sink.json:

然后,我们将对水槽连接器做同样的处理,调用文件connect-file-sink.json

{
    "name": "local-file-sink",
    "config": {
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test-distributed.sink.txt",
        "topics": "connect-distributed"
    }
}

And perform the POST like before:

并像以前一样执行POST。

curl -d @$CONFLUENT_HOME/connect-file-sink.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

If needed, we can verify, that this setup is working correctly:

如果需要,我们可以验证,这个设置是否正常工作。

$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-distributed --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

And, if we have a look at the folder $CONFLUENT_HOME, we can see that a file test-distributed.sink.txt was created here:

而且,如果我们看一下$CONFLUENT_HOME文件夹,我们可以看到这里创建了一个文件test-distributed.sink.txt

cat $CONFLUENT_HOME/test-distributed.sink.txt
foo
bar

After we tested the distributed setup, let’s clean up, by removing the two connectors:

在我们测试了分布式设置之后,让我们清理一下,把两个连接器拆下来。

curl -X DELETE http://localhost:8083/connectors/local-file-source
curl -X DELETE http://localhost:8083/connectors/local-file-sink

8. Transforming Data

8.转化数据

8.1. Supported Transformations

8.1.支持的转换

Transformations enable us to make simple and lightweight modifications to individual messages.

转化使我们能够对个别信息进行简单和轻量级的修改。

Kafka Connect supports the following built-in transformations:

Kafka Connect支持以下内置转换。

  • InsertField – Add a field using either static data or record metadata
  • ReplaceField – Filter or rename fields
  • MaskField – Replace a field with the valid null value for the type (zero or an empty string, for example)
  • HoistField – Wrap the entire event as a single field inside a struct or a map
  • ExtractField – Extract a specific field from struct and map and include only this field in the results
  • SetSchemaMetadata – Modify the schema name or version
  • TimestampRouter – Modify the topic of a record based on original topic and timestamp
  • RegexRouter – Modify the topic of a record based on original topic, a replacement string, and a regular expression

A transformation is configured using the following parameters:

使用以下参数配置一个转换。

  • transforms – A comma-separated list of aliases for the transformations
  • transforms.$alias.type – Class name for the transformation
  • transforms.$alias.$transformationSpecificConfig – Configuration for the respective transformation

8.2. Applying a Transformer

8.2.应用变压器

To test some transformation features, let’s set up the following two transformations:

为了测试一些转换功能,我们来设置以下两个转换。

  • First, let’s wrap the entire message as a JSON struct
  • After that, let’s add a field to that struct

Before applying our transformations, we have to configure Connect to use schemaless JSON, by modifying the connect-distributed.properties:

在应用我们的转换之前,我们必须通过修改connect-distributed.properties,将Connect配置为使用无模式的JSON。

key.converter.schemas.enable=false
value.converter.schemas.enable=false

After that, we have to restart Connect, again in distributed mode:

之后,我们必须重新启动连接,同样是在分布式模式下。

$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

Again, we need to create the body for the source connector POST as a JSON file. Here, we’ll call it connect-file-source-transform.json.

同样,我们需要将源连接器POST的主体创建为一个JSON文件。在这里,我们将其称为connect-file-source-transform.json.

Besides the already known parameters, we add a few lines for the two required transformations:

除了已经知道的参数,我们还为两个必要的转换添加了几行。

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-transformation.txt",
        "topic": "connect-transformation",
        "transforms": "MakeMap,InsertSource",
        "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.MakeMap.field": "line",
        "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertSource.static.field": "data_source",
        "transforms.InsertSource.static.value": "test-file-source"
    }
}

After that, let’s perform the POST:

之后,我们来执行POST。

curl -d @$CONFLUENT_HOME/connect-file-source-transform.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

Let’s write some lines to our test-transformation.txt:

让我们给我们的test-transformation.txt写几行。

Foo
Bar

If we now inspect the connect-transformation topic, we should get the following lines:

如果我们现在检查connect-transformation主题,我们应该得到以下几行。

{"line":"Foo","data_source":"test-file-source"}
{"line":"Bar","data_source":"test-file-source"}

9. Using Ready Connectors

9.使用就绪的连接器

After using these simple connectors, let’s have a look at more advanced ready-to-use connectors, and how to install them.

在使用这些简单的连接器之后,让我们看看更高级的即用型连接器,以及如何安装它们。

9.1. Where to Find Connectors

9.1.哪里可以找到连接器

Pre-built connectors are available from different sources:

预建的连接器可从不同的来源获得:

  • A few connectors are bundled with plain Apache Kafka (source and sink for files and console)
  • Some more connectors are bundled with Confluent Platform (ElasticSearch, HDFS, JDBC, and AWS S3)
  • Also check out Confluent Hub, which is kind of an app store for Kafka connectors. The number of offered connectors is growing continuously:
    • Confluent connectors (developed, tested, documented and are fully supported by Confluent)
    • Certified connectors (implemented by a 3rd party and certified by Confluent)
    • Community-developed and -supported connectors
  • Beyond that, Confluent also provides a Connectors Page, with some connectors which are also available at the Confluent Hub, but also with some more community connectors
  • And finally, there are also vendors, who provide connectors as part of their product. For example, Landoop provides a streaming library called Lenses, which also contains a set of ~25 open source connectors (many of them also cross-listed in other places)

9.2. Installing Connectors from Confluent Hub

9.2.从Confluent Hub安装连接器

The enterprise version of Confluent provides a script for installing Connectors and other components from Confluent Hub (the script is not included in the Open Source version). If we’re using the enterprise version, we can install a connector using the following command:

Confluent的企业版提供了一个脚本,用于安装Confluent Hub的连接器和其他组件(该脚本不包括在开源版中)。如果我们使用的是企业版,我们可以用下面的命令安装一个连接器。

$CONFLUENT_HOME/bin/confluent-hub install confluentinc/kafka-connect-mqtt:1.0.0-preview

9.3. Installing Connectors Manually

9.3.手动安装连接器

If we need a connector, which is not available on Confluent Hub or if we have the Open Source version of Confluent, we can install the required connectors manually. For that, we have to download and unzip the connector, as well as move the included libs to the folder specified as plugin.path.

如果我们需要一个Confluent Hub上没有的连接器,或者我们有Confluent的开源版本,我们可以手动安装所需的连接器。为此,我们必须下载并解压连接器,以及将包含的libs移动到plugin.path.指定的文件夹中。

For each connector, the archive should contain two folders that are interesting for us:

对于每个连接器,存档应该包含两个对我们来说很有趣的文件夹。

  • The lib folder contains the connector jar, for example, kafka-connect-mqtt-1.0.0-preview.jar, as well as some more jars required by the connector
  • The etc folder holds one or more reference config files

We have to move the lib folder to $CONFLUENT_HOME/share/java, or whichever path we specified as plugin.path in connect-standalone.properties and connect-distributed.properties. In doing so, it might also make sense to rename the folder to something meaningful.

我们必须将 lib 文件夹移到 $CONFLUENT_HOME/share/java,或者我们在 connect-standalone.propertiesconnect-distributed.properties 中指定为 plugin.path 的任何路径。在这样做的时候,把这个文件夹重命名为有意义的东西可能也是有意义的。

We can use the config files from etc either by referencing them while starting in standalone mode, or we can just grab the properties and create a JSON file from them.

我们可以使用etc中的配置文件,在独立模式下启动时引用它们,或者我们可以直接抓取属性并从它们中创建一个JSON文件。

10. Conclusion

10.结论

In this tutorial, we had a look at how to install and use Kafka Connect.

在这个教程中,我们看了一下如何安装和使用Kafka Connect.

We looked at types of connectors, both source and sink. We also looked at some features and modes that Connect can run in. Then, we reviewed transformers. And finally, we learned where to get and how to install custom connectors.

我们看了连接器的类型,包括源和汇。我们还看了连接可以运行的一些功能和模式。然后,我们回顾了变压器。最后,我们了解了从哪里获得以及如何安装自定义连接器。

As always, the config files can be found over on GitHub.

一如既往,配置文件可以在GitHub上找到。