JSON File Data Into Kafka Topic – 将 JSON 文件数据导入 Kafka 主题

最后修改: 2023年 9月 13日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Apache Kafka is an open-source, fault-tolerant, and highly scalable streaming platform. It follows a publish-subscribe architecture to stream data in real-time. We can process high-volume massive data with very low latency processing by putting the data in a queue. Sometimes, we need to send JSON data type to the Kafka topic for data processing and analysis.

Apache卡夫卡是一个开源、容错和高度可扩展的流平台。它采用发布-订阅架构来实时流式传输数据。我们可以通过将数据放入队列,以极低的延迟处理大批量海量数据。有时,我们需要向 Kafka 主题发送 JSON 数据类型,以便进行数据处理和分析。

In this tutorial, we’ll learn how to stream JSON data into Kafka topics. Additionally, we’ll also look at how to configure a Kafka producer and consumer for JSON data.

在本教程中,我们将学习如何将 JSON 数据流引入 Kafka 主题。此外,我们还将了解如何为 JSON 数据配置 Kafka 生产者和消费者。

2. Importance of JSON Data in Kafka

2.JSON 数据在 Kafka 中的重要性

Architecturally, Kafka supports message streams in its system. Therefore, we can also send JSON data to the Kafka server. Nowadays, in modern application systems, every application primarily deals in JSON only, so it becomes very important to communicate in JSON format. It is beneficial in real-time activity tracking of users and their behavior on websites and applications by sending data in JSON format.

从架构上讲,Kafka 系统支持消息流。因此,我们也可以向 Kafka 服务器发送 JSON 数据。如今,在现代应用系统中,每个应用都主要使用 JSON 格式处理数据,因此使用 JSON 格式通信变得非常重要。通过发送 JSON 格式的数据,可以实时跟踪用户的活动及其在网站和应用中的行为。

Steaming JSON type of data into a Kafka server helps in real-time data analysis. It facilitates an event-driven architecture where each microservice subscribes to its relevant topics and provides changes in real-time. With Kafka topics and JSON formats, it is easy to deliver IOT data, communicate between microservices, and aggregate metrics.

将 JSON 类型的数据蒸发到 Kafka 服务器有助于实时数据分析。它促进了事件驱动架构,其中每个微服务都会订阅其相关主题并实时提供变化。有了 Kafka 主题和 JSON 格式,就可以轻松交付物联网数据、在微服务之间进行通信并汇总指标。

3. Kafka Setup

3.Kafka 设置

To stream JSON into the Kafka server, we need to first set up the Kafka broker and Zookeeper. We can follow this tutorial to set up a full-fledged Kafka server. Now, let’s check the command to create a Kafka topic baeldung on which we’ll be producing and consuming the JSON data:

要将 JSON 流导入 Kafka 服务器,我们需要首先设置 Kafka brokerZookeeper 。我们可以按照本教程设置一个完整的 Kafka 服务器。现在,让我们检查一下创建 Kafka 主题 baeldung 的命令,我们将在该主题上生成和消费 JSON 数据:

$ docker-compose exec kafka kafka-topics.sh --create --topic baeldung
  --partitions 1 --replication-factor 1 --bootstrap-server kafka:9092

The above command creates a Kafka topic baeldung with replication factor 1. Here, we have created a Kafka topic with only 1 replication factor, as it is only for demo purposes. We might need a multi-replication factor in real-case scenarios as it helps in system failover cases. Also, it provides high availability and reliability of data.

上述命令创建了一个复制因子为 1 的 Kafka 主题 baeldung。在这里,我们创建了一个只有 1 复制因子的 Kafka 主题,因为它只是用于演示目的。在实际应用场景中,我们可能需要多个复制因子,因为它有助于系统故障切换。此外,它还能提供数据的高可用性和可靠性。

4. Produce Data

4.生成数据

Kafka producer is the most basic component of the whole Kafka ecosystem, which provides the facility of producing data to the Kafka server. To demonstrate, let’s look at the command to start a producer using the docker-compose command:

Kafka 生产者是整个 Kafka 生态系统中最基本的组件,它提供了向 Kafka 服务器生产数据的功能。为了演示,让我们看看使用 docker-compose 命令启动生产者的过程:

$ docker-compose exec kafka kafka-console-producer.sh --topic baeldung
  --broker-list kafka:9092

In the above command, we created a Kafka producer to send messages to the Kafka broker. Furthermore, to send JSON data type, we would need to tweak the command. Before proceeding, let’s first create a sample JSON file sampledata.json:

在上述命令中,我们创建了一个 Kafka 生产者来向 Kafka 代理发送消息。此外,为了发送 JSON 数据类型,我们需要对命令进行调整。在继续之前,让我们先创建一个示例 JSON 文件 sampledata.json

{
    "name": "test",
    "age": 26,
    "email": "test@baeldung.com",
    "city": "Bucharest",
    "occupation": "Software Engineer",
    "company": "Baeldung Inc.",
    "interests": ["programming", "hiking", "reading"]
}

The above sampledata.json file contains the basic information of a user in JSON format. To send JSON data into Kafka topics, we’ll need the jq library since it is very powerful to work with JSON data. To demonstrate, let’s install the jq library to pass this JSON data to the Kafka producer:

上述 sampledata.json 文件包含 JSON 格式的用户基本信息。要将 JSON 数据发送到 Kafka 主题,我们需要 jq库,因为它在处理 JSON 数据方面非常强大。为了演示,让我们安装 jq 库将 JSON 数据传递给 Kafka 生产者:

$ sudo apt-get install jq

The above command simply installs the jq library on the Linux machine. Furthermore, let’s look at the command to send JSON data:

上述命令只是在 Linux 机器上安装了 jq 库。此外,让我们看看发送 JSON 数据的命令:

$ jq -rc . sampledata.json | docker-compose exec -T kafka kafka-console-producer.sh --topic baeldung --broker-list kafka:9092

The above command is a single-line command to process and stream JSON data into the Kafka topic in a Docker environment. Firstly, the jq command processes the sampledata.json, and then using the -r option, it ensures that the JSON data is in row format and unquoted format. After that, the -c option makes sure that the data is presented in a single line so that the data can easily stream to the respective Kafka topic.

上述命令是一条单行命令,用于在 Docker 环境中处理 JSON 数据并将其流到 Kafka 主题中。首先,jq 命令会处理 sampledata.json,然后使用 -r 选项确保 JSON 数据采用行格式和无引号格式。之后,-c 选项将确保数据以单行形式呈现,以便数据能轻松流向相应的 Kafka 主题。

5. Consumer Data

5.消费者数据

So far, we have successfully sent the JSON data to the baeldung Kafka topic. Now, let’s look at the command to consume that data:

到目前为止,我们已经成功地将 JSON 数据发送到了 baeldung Kafka 主题。现在,让我们来看看消耗这些数据的命令:

$ docker-compose exec kafka kafka-console-consumer.sh --topic baeldung  --from-beginning --bootstrap-server kafka:9092
{"name":"test","age":26,"email":"test@baeldung.com","city":"Bucharest","occupation":"Software Engineer","company":"Baeldung Inc.","interests":["programming","hiking","reading"]}

The above command consumes all the data sent over to the baeldung topic from the beginning. In the previous section, we sent JSON data. Therefore, it also consumes that JSON data as well. In short, the above command allows users to actively monitor all the messages sent over to topic baeldung. It facilitates real-time data consumption using the Kafka-based messaging system.

上述命令将消耗从一开始发送到 baeldung 主题的所有数据。在上一节中,我们发送了 JSON 数据。因此,它也会消耗 JSON 数据。简而言之,上述命令允许用户主动监控发送到主题 baeldung 的所有消息。它有助于使用基于 Kafka 的消息系统实时消费数据。

6. Conclusion

6.结论

In this article, we explored how to stream JSON data into a Kafka topic. First, we created a sample JSON, and then we streamed that JSON into the Kafka topic using a producer. After that, we consumed that data using the docker-compose command.

在本文中,我们探讨了如何将 JSON 数据流导入 Kafka 主题。首先,我们创建了一个 JSON 样本,然后使用生产者将该 JSON 数据流导入 Kafka 主题。然后,我们使用 docker-compose 命令消耗这些数据。

In short, we covered all the necessary steps to send JSON format data to the topic using a Kafka producer and consumer. Moreover, it provides schema evolution since JSON can handle graceful updates without affecting existing data.

简而言之,我们介绍了使用 Kafka 生产者和消费者向主题发送 JSON 格式数据的所有必要步骤。此外,由于 JSON 可以在不影响现有数据的情况下处理优雅的更新,因此它还提供了模式演进功能。