1. Overview
In this tutorial, we’ll briefly introduce Apache Kafka and then see how to programmatically create and configure topics in a Kafka cluster.
在本教程中,我们将简要介绍Apache Kafka,然后看看如何以编程方式创建和配置Kafka集群中的主题。
2. Introduction to Kafka
Apache Kafka is a powerful, high-performance, distributed event-streaming platform.
Apache Kafka是一个强大的、高性能的、分布式的事件流平台。
Generally, producer applications publish events to Kafka while consumers subscribe to these events in order to read and process them. Kafka uses topics to store and categorize these events, e.g., in an e-commerce application, there could be an ‘orders’ topic.
一般来说,生产者应用向Kafka发布事件,而消费者订阅这些事件,以便阅读和处理它们。Kafka使用主题来存储和分类这些事件,例如,在一个电子商务应用程序中,可能有一个 “订单 “主题。
Kafka topics are partitioned, which distributes data across multiple brokers for scalability. They can be replicated in order to make the data fault-tolerant and highly available. Topics also retain events even after consumption for as long as required. This is all managed on a per-topic basis via Kafka command-line tools and key-value configurations.
However, in addition to the command-line tools, Kafka also provides an Admin API to manage and inspect topics, brokers, and other Kafka objects. In our example, we’ll be using this API to create new topics.
然而,除了命令行工具外,Kafka还提供了Admin API,以管理和检查主题、经纪人和其他Kafka对象。在我们的例子中,我们将使用这个API来创建新的主题。
3. Dependencies
To use Admin API, let’s add the kafka-clients dependency to our pom.xml:
为了使用Admin API,让我们将kafka-clients dependency添加到我们的pom.xml。
4. Setting Up Kafka
Before creating new topics, we need at least a single-node Kafka cluster.
In this tutorial, we’ll use the Testcontainers framework to instantiate a Kafka container. We can then run reliable and self-contained integration tests that don’t rely on an external Kafka server running. For this, we’ll need two more dependencies specifically for our tests.
First, let’s add the Testcontainers Kafka dependency to our pom.xml:
首先,让我们把Testcontainers Kafka依赖性添加到我们的pom.xml。
Next, we’ll add the junit-jupiter artifact for running Testcontainer tests using JUnit 5:
接下来,我们将添加junit-jupiter工件,用于使用JUnit 5运行Testcontainer测试。
Now that we have all the necessary dependencies configured, we can write a simple application to programmatically create new topics.
5. Admin API
Let’s begin by creating a new Properties instance with minimal configuration for a local broker:
Properties properties = new Properties();
Now we can obtain an Admin instance:
Admin admin = Admin.create(properties)
The create method accepts a Properties object (or a Map) with the bootstrap.servers property and returns a thread-safe instance.
The admin client uses this property to discover the brokers in the cluster and subsequently perform any admin operations. As such, it would usually be enough to include two or three broker addresses in order to cover the possibility of some instances being unavailable.
The AdminClientConfig class contains constants for all the admin client configuration entries.
6. Topic Creation
Let’s start by creating a JUnit 5 test with Testcontainers to verify successful topic creation. We’ll utilize the Kafka module, which uses the official Kafka Docker image for Confluent OSS Platform:
让我们首先创建一个使用Testcontainers的JUnit 5测试,以验证成功创建主题。我们将利用Kafka 模块,它使用Confluent OSS 平台的官方 Kafka Docker 镜像。
void givenTopicName_whenCreateNewTopic_thenTopicIsCreated() throws Exception {
String topicCommand = "/usr/bin/kafka-topics --bootstrap-server=localhost:9092 --list";
String stdout = KAFKA_CONTAINER.execInContainer("/bin/sh", "-c", topicCommand)
Here, Testcontainers will automatically instantiate and manage the Kafka container during test execution. We simply invoke our application code and verify that the topic has been successfully created in the running container.
6.1. Create With Default Options
Topic partitions and the replication factor are the key considerations for new topics. We’ll keep things simple and create our example topic with 1 partition and a replication factor of 1:
try (Admin admin = Admin.create(properties)) {
int partitions = 1;
short replicationFactor = 1;
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
CreateTopicsResult result = admin.createTopics(
KafkaFuture<Void> future = result.values().get(topicName);
Here, we’ve used the Admin.createTopics method to create a batch of new topics with default options. As the Admin interface extends the AutoCloseable interface, we’ve used try-with-resources to execute our operation. This ensures that resources are released appropriately.
Importantly, this method communicates with the Controller Broker and executes asynchronously. The returned CreateTopicsResult object exposes a KafkaFuture for accessing the results of each item in the request batch. This follows the Java asynchronous programming pattern and allows callers to obtain the results of the operation using the Future.get method.
For synchronous behavior, we can call this method immediately to retrieve the result of our operation. This blocks until the operation is complete or has failed. In case of failure, it results in an ExecutionException which wraps the underlying cause.
6.2. Create With Options
Instead of default options, we can also use the overloaded form of the Admin.createTopics method and provide some options via the CreateTopicsOptions object. We can use these to modify the admin client behavior when creating new topics:
CreateTopicsOptions topicOptions = new CreateTopicsOptions()
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic), topicOptions
Here, we’ve set the validateOnly option to true, meaning that the client will only validate without actually creating the topic. Similarly, the retryOnQuotaViolation option is set to false so that the operation is not retried in case of quota violation.
6.3. New Topic Configuration
Kafka has a wide range of topic configurations that control topic behavior, such as data retention and compression, etc. These have both a server default value as well as an optional per-topic override.
We can provide the topic configurations by using a config map for the new topic:
// Create a compacted topic with 'lz4' compression codec
Map<String, String> newTopicConfig = new HashMap<>();
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
The TopicConfig class from the Admin API contains the keys that can be used to configure topics at creation time.
来自Admin API的TopicConfig类包含在创建时可用于配置主题的密钥。
7. Other Topic Operations
As well as the ability to create new topics, Admin API also has operations to delete, list, and describe topics. All these topic-related operations follow the same pattern as we’ve seen for topic creation.
除了创建新主题的能力之外,Admin API 还具有删除、列表和描述主题的操作。所有这些与主题相关的操作都遵循我们在创建主题时看到的相同模式。
Each of these operation methods has an overloaded version that takes an xxxTopicOptions object as input. All of these methods return the corresponding xxxTopicsResult object. This, in turn, provides the KafkaFuture for accessing the results of the asynchronous operation.
Finally, it’s also worth mentioning that since its introduction in Kafka version, the admin API is still evolving, as indicated by the InterfaceStability.Evolving annotation. This implies that the API can change in the future, and a minor release may break compatibility.
8. Conclusion
In this tutorial, we’ve seen how to create a new topic in Kafka using the Java admin client.
Initially, we created a topic with default and then with explicit options. Following on from this, we saw how to configure the new topic using various properties. Finally, we briefly covered other topic-related operations using the admin client.
Along the way, we also saw how to use Testcontainers to set up a simple single-node cluster from our tests.
As always, the full source code of the article is available over on GitHub.