List Active Brokers in a Kafka Cluster Using Shell Commands – 使用Shell命令列出Kafka集群中的活跃经纪商

最后修改: 2021年 7月 10日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Monitoring an event-driven system that uses the Apache Kafka cluster would often require us to get the list of active brokers. In this tutorial, we’ll explore few shell commands to get the list of active brokers in a running cluster.

监控一个使用Apache Kafka集群的事件驱动系统,往往需要我们获得活动经纪商的列表。在本教程中,我们将探讨一些shell命令,以获得运行中的集群中的活动经纪人列表

2. Setup

2.设置

For the purpose of this article, let’s use the below docker-compose.yml file to set up a two-node Kafka cluster:

为了本文的目的,让我们使用下面的 docker-compose.yml文件来设置一个双节点的Kafka集群

$ cat docker-compose.yml
---
version: '2'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
  
  kafka-1:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  kafka-2:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1
    ports:
      - 39092:39092
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Now, let’s spin-up the Kafka cluster using the docker-compose command:

现在,让我们使用docker-compose命令来启动Kafka集群。

$ docker-compose up -d

We can verify that the Zookeeper server is listening on port 2181, while the Kafka brokers are listening on ports 29092 and 39092, respectively:

我们可以验证Zookeeper服务器的监听端口是2181,而Kafka经纪人的监听端口是2909239092,

$ ports=(2181 29092 39092)
$ for port in $ports
do
nc -z localhost $port
done
Connection to localhost port 2181 [tcp/eforward] succeeded!
Connection to localhost port 29092 [tcp/*] succeeded!
Connection to localhost port 39092 [tcp/*] succeeded!

3. Using Zookeeper APIs

3.使用Zookeeper APIs

In a Kafka cluster, the Zookeeper server stores metadata related to the Kafka broker servers. So, let’s use the filesystem APIs exposed by Zookeeper to get the broker details.

在Kafka集群中,Zookeeper服务器存储与Kafka代理服务器相关的元数据。因此,让我们使用Zookeeper暴露的文件系统API来获取经纪人的详细信息。

3.1. zookeeper-shell Command

3.1.zookeeper-shell命令

Most Kafka distributions are shipped with either zookeeper-shell or zookeeper-shell.sh binary. So, it’s a de facto standard to use this binary to interact with the Zookeeper server.

大多数Kafka发行版都带有zookeeper-shellzookeeper-shell.sh二进制文件。因此,使用这个二进制文件来与Zookeeper服务器进行交互是一个事实上的标准。

First, let’s connect to the Zookeeper server running at localhost:2181:

首先,让我们连接到运行在localhost:2181的Zookeeper服务器。

$ /usr/local/bin/zookeeper-shell localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!

Once we’re connected to the Zookeeper server, we can execute typical filesystem commands such as ls to get metadata information stored in the server. Let’s find the ids of the brokers that are currently alive:

一旦我们连接到Zookeeper服务器,我们就可以执行典型的文件系统命令,如ls来获取存储在服务器中的元数据信息。让我们找到当前活着的经纪商的ID。

ls /brokers/ids
[1, 2]

We can see that there are currently two active brokers, with ids 1 and 2. Using the get command, we can fetch more details for a specific broker with a given id:

我们可以看到,目前有两个活跃的经纪商,ID为1和2。使用get命令,我们可以获取给定id的特定经纪人的更多细节。

get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}
get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2:9092","PLAINTEXT_HOST://localhost:39092"],"jmx_port":-1,"port":9092,"host":"kafka-2","version":5,"timestamp":"1625336133967"}

Note that the broker with id=1 is listening on port 29092, while the second broker with id=2 is listening on port 39092.

注意,id=1的经纪人在端口29092上监听,而id=2的第二个经纪人在端口39092上监听。

Finally, to exit the Zookeeper shell, we can use the quit command:

最后,要退出Zookeeper外壳,我们可以使用quit命令。

quit

3.2. zkCli Command

3.2.zkCli 命令

Just like Kafka distributions are shipped with the zookeeper-shell binary, Zookeeper distributions are shipped with zkCli or zkCli.sh binary.

就像Kafka发行版与zookeeper-shell二进制文件一起运送,Zookeeper发行版也与zkClizkCli.sh二进制文件一起运送。

As such, interacting with zkCli is exactly like interacting with zookeeper-shell, so let’s go ahead and confirm that we’re able to get the required details for the broker with id=1:

因此,zkCli互动就像与zookeeper-shell互动一样,所以让我们继续确认,我们能够获得id=1的经纪人所需的细节。

$ zkCli -server localhost:2181 get /brokers/ids/1
Connecting to localhost:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1:9092","PLAINTEXT_HOST://localhost:29092"],"jmx_port":-1,"port":9092,"host":"kafka-1","version":5,"timestamp":"1625336133848"}

As expected, we can see that broker details fetched using zookeeper-shell match those obtained using zkCli.

正如预期的那样,我们可以看到,使用zookeeper-shell获取的经纪人详细信息与使用zkCli获得的信息一致。

4. Using the Broker Version API

4.使用经纪人版本API

At times, we might have an incomplete list of active brokers, and we want to get all the available brokers in the cluster. In such a scenario, we can use the kafka-broker-api-versions command shipped with the Kafka distributions.

有时,我们可能有一个不完整的活动经纪商列表,我们想获得集群中所有可用的经纪商。在这种情况下,我们可以使用kafka-broker-api-versions命令与Kafka发行版一起提供的。

Let’s assume that we know about a broker running at localhost:29092, so let’s try to find out all active brokers participating in the Kafka cluster:

假设我们知道一个运行在localhost:29092的经纪人,那么让我们试着找出所有参与Kafka集群的活跃经纪人。

$ kafka-broker-api-versions --bootstrap-server localhost:29092 | awk '/id/{print $1}'
localhost:39092
localhost:29092

It’s worth noting that we used the awk command to filter the output and show only the broker address. Further, the result correctly shows that there are two active brokers in the cluster.

值得注意的是,我们使用awk命令来过滤输出,只显示经纪商地址。此外,结果正确显示集群中有两个活跃的经纪商。

Although this approach looks simpler than the Zookeeper CLI approach, the kafka-broker-api-versions binary is only a recent addition to Kafka distribution.

虽然这种方法看起来比Zookeeper CLI方法更简单,但kafka-broker-api-versions 二进制只是Kafka发行中最近增加的。

5. Shell Script

5.外壳脚本

In any practical scenario, executing the zkCli or zookeeper-shell commands manually for each broker would be taxing. So, let’s write a Shell script that takes the Zookeeper server address as an input, and in return, gives us the list of all active brokers.

在任何实际情况下,为每个经纪人手动执行zkClizookeeper-shell命令将是一种负担。因此,让我们编写一个Shell脚本,将Zookeeper服务器的地址作为输入,作为回报,给我们提供所有活动经纪商的列表。

5.1. Helper Functions

5.1.助手函数

Let’s write all the helper functions in the functions.sh script:

让我们在functions.sh脚本中编写所有的辅助函数。

$ cat functions.sh
#!/bin/bash
ZOOKEEPER_SERVER="${1:-localhost:2181}"

# Helper Functions Below

First, let’s write the get_broker_ids function to get the set of active broker ids that will call the zkCli command internally:

首先,让我们编写get_broker_ids函数,以获得一组活跃的经纪人ID,在内部调用zkCli命令。

function get_broker_ids {
broker_ids_out=$(zkCli -server $ZOOKEEPER_SERVER <<EOF
ls /brokers/ids
quit
EOF
)
broker_ids_csv="$(echo "${broker_ids_out}" | grep '^\[.*\]$')"
echo "$broker_ids_csv" | sed 's/\[//;s/]//;s/,/ /'
}

Next, let’s write the get_broker_details function to get the verbose broker details using the broker_id:

接下来,让我们编写get_broker_details函数,使用broker_id获得verbose broker details

function get_broker_details {
broker_id="$1"
echo "$(zkCli -server $ZOOKEEPER_SERVER <<EOF
get /brokers/ids/$broker_id
quit
EOF
)"
}

Now that we have the verbose broker details, let’s write the parse_broker_endpoint function to get the broker’s endpoint detail:

现在我们有了详细的经纪人资料,让我们编写parse_broker_endpoint函数来获得经纪人的端点细节。

function parse_endpoint_detail {
broker_detail="$1"
json="$(echo "$broker_detail"  | grep '^{.*}$')"
json_endpoints="$(echo $json | jq .endpoints)"
echo "$(echo $json_endpoints |jq . |  grep HOST | tr -d " ")"
}

Internally, we used the jq command for the JSON parsing.

在内部,我们使用jq命令进行JSON解析

5.2. Main Script

5.2 主脚本

Now, let’s write the main script get_all_active_brokers.sh that uses the helper functions defined in functions.sh:

现在,让我们编写主脚本get_all_active_brokers.sh,它使用functions.sh中定义的辅助函数。

$ cat get_all_active_brokers.sh
#!/bin/bash
. functions.sh "$1"

function get_all_active_brokers {
broker_ids=$(get_broker_ids)
for broker_id in $broker_ids
do
    broker_details="$(get_broker_details $broker_id)"
    broker_endpoint=$(parse_endpoint_detail "$broker_details")
    echo "broker_id="$broker_id,"endpoint="$broker_endpoint
done
}

get_all_active_brokers

We can notice that we’ve iterated over all the broker_ids in the get_all_active_brokers function to aggregate the endpoints for all the active brokers.

我们可以注意到,我们在get_all_active_brokers函数中遍历了所有的broker_ids,以汇总所有活动经纪商的端点。

Finally, let’s execute the get_all_active_brokers.sh script so that we can see the list of active brokers for our two-node Kafka cluster:

最后,让我们执行get_all_active_brokers.sh脚本,这样我们就可以看到双节点Kafka集群的活动经纪商列表。

$ ./get_all_active_brokers.sh localhost:2181
broker_id=1,endpoint="PLAINTEXT_HOST://localhost:29092"
broker_id=2,endpoint="PLAINTEXT_HOST://localhost:39092"

We can see that the results are accurate. It looks like we nailed it!

我们可以看到,结果是准确的。看起来我们已经成功了!

6. Conclusion

6.结语

In this tutorial, we learned about shell commands such as zookeeper-shell, zkCli, and kafka-broker-api-versions to get the list of active brokers in a Kafka cluster. Additionally, we wrote a shell script to automate the process of finding broker details in real-world scenarios.

在本教程中,我们了解了诸如zookeeper-shellzkClikafka-broker-api-versions等shell命令,以获取Kafka集群中的活动经纪人列表。此外,我们还编写了一个shell脚本,以便在现实世界的场景中自动寻找经纪人的详细信息