Kafka Connect Example with MQTT and MongoDB – 使用MQTT和MongoDB的Kafka连接实例

最后修改: 2019年 1月 14日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In a previous article, we had a quick introduction to Kafka Connect, including the different types of connectors, basic features of Connect, as well as the REST API.

之前的文章中,我们对Kafka Connect进行了快速介绍,包括不同类型的连接器、Connect的基本功能以及REST API等。

In this tutorial, we’ll use Kafka connectors to build a more “real world” example.

在本教程中,我们将使用Kafka连接器来构建一个更加 “真实 “的例子。

We’ll use a connector to collect data via MQTT, and we’ll write the gathered data to MongoDB.

我们将使用一个连接器,通过MQTT收集数据,并将收集到的数据写入MongoDB。

2. Setup Using Docker

2.使用Docker进行设置

We’ll use Docker Compose to set up the infrastructure. That includes an MQTT broker as the source, Zookeeper, one Kafka broker as well Kafka Connect as middleware, and finally a MongoDB instance including a GUI tool as the sink.

我们将使用Docker Compose来设置基础设施。这包括一个MQTT代理作为源,Zookeeper,一个Kafka代理以及Kafka Connect作为中间件,最后是一个MongoDB实例,包括一个GUI工具作为汇。

2.1. Connector Installation

2.1.连接器的安装

The connectors required for our example, an MQTT source as well as a MongoDB sink connector, are not included in plain Kafka or the Confluent Platform.

我们的例子所需要的连接器,即MQTT源以及MongoDB水槽连接器,并没有包含在普通的Kafka或Confluent平台中。

As we discussed in the previous article, we can download the connectors (MQTT as well as MongoDB) from the Confluent hub. After that, we have to unpack the jars into a folder, which we’ll mount into the Kafka Connect container in the following section.

正如我们在上一篇文章中所讨论的,我们可以从Confluent中心下载连接器(MQTT以及MongoDB)。之后,我们必须将jars解压到一个文件夹中,我们将在下一节中把它装入Kafka Connect容器。

Let’s use the folder /tmp/custom/jars for that. We have to move the jars there before starting the compose stack in the following section, as Kafka Connect loads connectors online during startup.

让我们使用/tmp/custom/jars这个文件夹。我们必须在下一节启动编译堆栈之前把jars移到那里,因为Kafka Connect在启动时在线加载连接器。

2.2. Docker Compose File

2.2.Docker编排文件

We describe our setup as a simple Docker compose file, which consists of six containers:

我们将我们的设置描述为一个简单的Docker组合文件,它由六个容器组成。

version: '3.3'

services:
  mosquitto:
    image: eclipse-mosquitto:1.5.5
    hostname: mosquitto
    container_name: mosquitto
    expose:
      - "1883"
    ports:
      - "1883:1883"
  zookeeper:
    image: zookeeper:3.4.9
    restart: unless-stopped
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./zookeeper/data:/data
      - ./zookeeper/datalog:/datalog
  kafka:
    image: confluentinc/cp-kafka:5.1.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./kafka/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.1.0
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
      CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - /tmp/custom/jars:/etc/kafka-connect/jars
    depends_on:
      - zookeeper
      - kafka
      - mosquitto
  mongo-db:
    image: mongo:4.0.5
    hostname: mongo-db
    container_name: mongo-db
    expose:
      - "27017"
    ports:
      - "27017:27017"
    command: --bind_ip_all --smallfiles
    volumes:
      - ./mongo-db:/data
  mongoclient:
    image: mongoclient/mongoclient:2.2.0
    container_name: mongoclient
    hostname: mongoclient
    depends_on:
      - mongo-db
    ports:
      - 3000:3000
    environment:
      MONGO_URL: "mongodb://mongo-db:27017"
      PORT: 3000
    expose:
      - "3000"

The mosquitto container provides a simple MQTT broker based on Eclipse Mosquitto.

mosquitto容器提供了一个基于Eclipse Mosquitto的简单MQTT代理。

The containers zookeeper and kafka define a single-node Kafka cluster.

容器zookeeperkafka定义了一个单节点Kafka集群。

kafka-connect defines our Connect application in distributed mode.

kafka-connect定义了我们分布式模式下的Connect应用。

And finally, mongo-db defines our sink database, as well as the web-based mongoclient, which helps us to verify whether the sent data arrived correctly in the database.

最后,mongo-db定义了我们的水槽数据库,以及基于网络的mongoclient,它帮助我们验证发送的数据是否正确到达数据库中。

We can start the stack using the following command:

我们可以用以下命令启动堆栈。

docker-compose up

3. Connector Configuration

3.连接器配置

As Kafka Connect is now up and running,  we can now configure the connectors.

由于Kafka Connect现在已经启动并运行,我们现在可以配置连接器了。

3.1. Configure Source Connector

3.1.配置源连接器

Let’s configure the source connector using the REST API:

让我们使用REST API来配置源连接器。

curl -d @<path-to-config-file>/connect-mqtt-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Our connect-mqtt-source.json file looks like this:

我们的connect-mqtt-source.json文件看起来像这样。

{
    "name": "mqtt-source",
    "config": {
        "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
        "tasks.max": 1,
        "mqtt.server.uri": "tcp://mosquitto:1883",
        "mqtt.topics": "baeldung",
        "kafka.topic": "connect-custom",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "confluent.topic.bootstrap.servers": "kafka:9092",
        "confluent.topic.replication.factor": 1
    }
}

There are a few properties, which we haven’t used before:

有几个属性,我们以前没有使用过。

  • mqtt.server.uri is the endpoint our connector will connect to
  • mqtt.topics is the MQTT topic our connector will subscribe to
  • kafka.topic defines the Kafka topic the connector will send the received data to
  • value.converter defines a converter which will be applied to the received payload. We need the ByteArrayConverter, as the MQTT Connector uses Base64 by default, while we want to use plain text
  • confluent.topic.bootstrap.servers is required by the newest version of the connector
  • The same applies to confluent.topic.replication.factor: it defines the replication factor for a Confluent-internal topic – as we have only one node in our cluster, we have to set that value to 1

3.2. Test Source Connector

3.2.测试源连接器

Let’s run a quick test by publishing a short message to the MQTT broker:

让我们通过向MQTT代理发布一个短消息来进行快速测试。

docker run \
-it --rm --name mqtt-publisher --network 04_custom_default \
efrecon/mqtt-client \
pub -h mosquitto  -t "baeldung" -m "{\"id\":1234,\"message\":\"This is a test\"}"

And if we listen to the topic, connect-custom:

而如果我们听了这个话题,连接-定制

docker run \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-console-consumer --network 04_custom_default --bootstrap-server kafka:9092 --topic connect-custom --from-beginning

then we should see our test message.

然后我们应该看到我们的测试信息。

3.3. Setup Sink Connector

3.3.设置水槽连接器

Next, we need our sink connector. Let’s again use the REST API:

接下来,我们需要我们的水槽连接器。让我们再次使用REST API。

curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Our connect-mongodb-sink.json file looks like this:

我们的connect-mongodb-sink.json文件看起来像这样。

{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": 1,
        "topics": "connect-custom",
        "mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
        "mongodb.collection": "MyCollection",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false
    }
}

We have the following MongoDB-specific properties here:

我们在这里有以下针对MongoDB的属性。

  • mongodb.connection.uri contains the connection string for our MongoDB instance
  • mongodb.collection defines the collection
  • Since the MongoDB connector is expecting JSON, we have to set JsonConverter for key.converter and value.converter
  • And we also need schemaless JSON for MongoDB, so we have to set key.converter.schemas.enable and value.converter.schemas.enable to false

3.4. Test Sink Connector

3.4.测试水槽连接器

Since our topic connect-custom already contains messages from the MQTT connector test, the MongoDB connector should have fetched them directly after creation.

由于我们的主题connect-custom已经包含了来自MQTT连接器测试的消息,MongoDB连接器应该在创建后直接获取它们

Hence, we should find them immediately in our MongoDB. We can use the web interface for that, by opening the URL http://localhost:3000/After login, we can select our MyCollection on the left, hit Execute, and our test message should be displayed.

因此,我们应该立即在我们的MongoDB中找到它们。我们可以通过打开URL http://localhost:3000/来使用网页界面。登录后,我们可以在左侧选择我们的MyCollection,点击Execute,我们的测试消息应该会显示出来。

3.5. End-to-end Test

3.5.端对端测试

Now, we can send any JSON struct using the MQTT client:

现在,我们可以使用MQTT客户端发送任何JSON结构。

{
    "firstName": "John",
    "lastName": "Smith",
    "age": 25,
    "address": {
        "streetAddress": "21 2nd Street",
        "city": "New York",
        "state": "NY",
        "postalCode": "10021"
    },
    "phoneNumber": [{
        "type": "home",
        "number": "212 555-1234"
    }, {
        "type": "fax",
        "number": "646 555-4567"
    }],
    "gender": {
        "type": "male"
    }
}

MongoDB supports schema-free JSON documents, and as we disabled schemas for our converter, any struct is immediately passed through our connector chain and stored in the database.

MongoDB支持无模式的JSON文档,由于我们禁用了转换器的模式,任何结构都会立即通过我们的连接器链并存储在数据库中。

Again, we can use the web interface at http://localhost:3000/.

同样,我们可以在http://localhost:3000/使用网络界面。

3.6. Clean Up

3.6.清理工作

Once we’re done, we can clean up our experiment and remove the two connectors:

一旦我们完成了,我们就可以清理我们的实验,并拆除两个连接器。

curl -X DELETE http://localhost:8083/connectors/mqtt-source
curl -X DELETE http://localhost:8083/connectors/mongodb-sink

After that, we can shut down the Compose stack with Ctrl + C.

之后,我们可以用Ctrl + C关闭Compose栈。

4. Conclusion

4.结论

In this tutorial, we built an example using Kafka Connect, to collect data via MQTT, and to write the gathered data to MongoDB.

在本教程中,我们建立了一个使用Kafka Connect的例子,通过MQTT收集数据,并将收集的数据写入MongoDB。

As always, the config files can be found over on GitHub.

一如既往,配置文件可以在GitHub上找到。