1. Overview
1.概述
In this tutorial, we’ll briefly introduce Apache Kafka and then see how to programmatically create and configure topics in a Kafka cluster.
在本教程中,我们将简要介绍Apache Kafka,然后看看如何以编程方式创建和配置Kafka集群中的主题。
2. Introduction to Kafka
2.卡夫卡简介
Apache Kafka is a powerful, high-performance, distributed event-streaming platform.
Apache Kafka是一个强大的、高性能的、分布式的事件流平台。
Generally, producer applications publish events to Kafka while consumers subscribe to these events in order to read and process them. Kafka uses topics to store and categorize these events, e.g., in an e-commerce application, there could be an ‘orders’ topic.
一般来说,生产者应用向Kafka发布事件,而消费者订阅这些事件,以便阅读和处理它们。Kafka使用主题来存储和分类这些事件,例如,在一个电子商务应用程序中,可能有一个 “订单 “主题。
Kafka topics are partitioned, which distributes data across multiple brokers for scalability. They can be replicated in order to make the data fault-tolerant and highly available. Topics also retain events even after consumption for as long as required. This is all managed on a per-topic basis via Kafka command-line tools and key-value configurations.
Kafka主题是分区的,它将数据分布在多个经纪商之间,以实现可扩展性。它们可以被复制,以使数据具有容错性和高可用性。主题还可以根据需要保留事件,即使在消费后也是如此。这一切都通过Kafka命令行工具和键值配置在每个主题基础上进行管理。
However, in addition to the command-line tools, Kafka also provides an Admin API to manage and inspect topics, brokers, and other Kafka objects. In our example, we’ll be using this API to create new topics.
然而,除了命令行工具外,Kafka还提供了Admin API,以管理和检查主题、经纪人和其他Kafka对象。在我们的例子中,我们将使用这个API来创建新的主题。
3. Dependencies
3.依赖性
To use Admin API, let’s add the kafka-clients dependency to our pom.xml:
为了使用Admin API,让我们将kafka-clients dependency添加到我们的pom.xml。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
4. Setting Up Kafka
4.设置Kafka
Before creating new topics, we need at least a single-node Kafka cluster.
在创建新主题之前,我们至少需要一个单节点的Kafka集群。
In this tutorial, we’ll use the Testcontainers framework to instantiate a Kafka container. We can then run reliable and self-contained integration tests that don’t rely on an external Kafka server running. For this, we’ll need two more dependencies specifically for our tests.
在本教程中,我们将使用Testcontainers框架来实例化一个Kafka容器。然后,我们可以运行可靠的、独立的集成测试,而不依赖于外部Kafka服务器的运行。为此,我们还需要两个专门用于测试的依赖项。
First, let’s add the Testcontainers Kafka dependency to our pom.xml:
首先,让我们把Testcontainers Kafka依赖性添加到我们的pom.xml。
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
Next, we’ll add the junit-jupiter artifact for running Testcontainer tests using JUnit 5:
接下来,我们将添加junit-jupiter工件,用于使用JUnit 5运行Testcontainer测试。
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
Now that we have all the necessary dependencies configured, we can write a simple application to programmatically create new topics.
现在我们已经配置了所有必要的依赖关系,我们可以编写一个简单的应用程序,以编程方式创建新的主题。
5. Admin API
5.管理API
Let’s begin by creating a new Properties instance with minimal configuration for a local broker:
让我们首先创建一个新的Properties实例,对本地代理进行最小配置。
Properties properties = new Properties();
properties.put(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()
);
Now we can obtain an Admin instance:
现在我们可以获得一个Admin实例。
Admin admin = Admin.create(properties)
The create method accepts a Properties object (or a Map) with the bootstrap.servers property and returns a thread-safe instance.
create方法接受一个Properties对象(或一个Map),并带有bootstrap.servers属性,并返回一个线程安全的实例。
The admin client uses this property to discover the brokers in the cluster and subsequently perform any admin operations. As such, it would usually be enough to include two or three broker addresses in order to cover the possibility of some instances being unavailable.
管理客户端使用这个属性来发现集群中的经纪商,并随后执行任何管理操作。因此,通常包括两个或三个经纪商地址就足够了,以涵盖某些实例不可用的可能性。
The AdminClientConfig class contains constants for all the admin client configuration entries.
AdminClientConfig类包含所有admin客户端配置项的常数。
6. Topic Creation
6.主题创作
Let’s start by creating a JUnit 5 test with Testcontainers to verify successful topic creation. We’ll utilize the Kafka module, which uses the official Kafka Docker image for Confluent OSS Platform:
让我们首先创建一个使用Testcontainers的JUnit 5测试,以验证成功创建主题。我们将利用Kafka 模块,它使用Confluent OSS 平台的官方 Kafka Docker 镜像。
@Test
void givenTopicName_whenCreateNewTopic_thenTopicIsCreated() throws Exception {
kafkaTopicApplication.createTopic("test-topic");
String topicCommand = "/usr/bin/kafka-topics --bootstrap-server=localhost:9092 --list";
String stdout = KAFKA_CONTAINER.execInContainer("/bin/sh", "-c", topicCommand)
.getStdout();
assertThat(stdout).contains("test-topic");
}
Here, Testcontainers will automatically instantiate and manage the Kafka container during test execution. We simply invoke our application code and verify that the topic has been successfully created in the running container.
在这里,Testcontainers会在测试执行期间自动实例化并管理Kafka容器。我们只需调用我们的应用程序代码,并验证主题是否已在运行的容器中成功创建。
6.1. Create With Default Options
6.1.用默认选项创建
Topic partitions and the replication factor are the key considerations for new topics. We’ll keep things simple and create our example topic with 1 partition and a replication factor of 1:
主题分区和复制因子是新主题的关键考虑因素。我们将保持简单,用1个分区和1个复制因子来创建我们的示例主题。
try (Admin admin = Admin.create(properties)) {
int partitions = 1;
short replicationFactor = 1;
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic)
);
KafkaFuture<Void> future = result.values().get(topicName);
future.get();
}
Here, we’ve used the Admin.createTopics method to create a batch of new topics with default options. As the Admin interface extends the AutoCloseable interface, we’ve used try-with-resources to execute our operation. This ensures that resources are released appropriately.
在这里,我们使用了Admin.createTopics方法来创建一批具有默认选项的新主题。由于Admin接口扩展了AutoCloseable接口,我们使用try-with-resources来执行我们的操作。这可以确保资源被适当地释放。
Importantly, this method communicates with the Controller Broker and executes asynchronously. The returned CreateTopicsResult object exposes a KafkaFuture for accessing the results of each item in the request batch. This follows the Java asynchronous programming pattern and allows callers to obtain the results of the operation using the Future.get method.
重要的是,该方法与控制器代理进行通信,并以异步方式执行。返回的CreateTopicsResult对象暴露了一个KafkaFuture,用于访问请求批中每个项目的结果。这遵循了Java异步编程模式,允许调用者使用Future.get方法获得操作结果。
For synchronous behavior, we can call this method immediately to retrieve the result of our operation. This blocks until the operation is complete or has failed. In case of failure, it results in an ExecutionException which wraps the underlying cause.
对于同步行为,我们可以立即调用这个方法来获取操作的结果。这个方法会阻塞,直到操作完成或失败。在失败的情况下,会产生一个ExecutionException,它包裹了根本原因。
6.2. Create With Options
6.2.使用选项创建
Instead of default options, we can also use the overloaded form of the Admin.createTopics method and provide some options via the CreateTopicsOptions object. We can use these to modify the admin client behavior when creating new topics:
我们也可以使用Admin.createTopics方法的重载形式,并通过CreateTopicsOptions对象提供一些选项,代替默认选项。我们可以用这些来修改创建新主题时的管理客户端行为。
CreateTopicsOptions topicOptions = new CreateTopicsOptions()
.validateOnly(true)
.retryOnQuotaViolation(false);
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic), topicOptions
);
Here, we’ve set the validateOnly option to true, meaning that the client will only validate without actually creating the topic. Similarly, the retryOnQuotaViolation option is set to false so that the operation is not retried in case of quota violation.
在这里,我们将validateOnly选项设置为true,意味着客户端将只进行验证而不实际创建主题。同样,retryOnQuotaViolation选项被设置为false,以便在违反配额的情况下不重试该操作。
6.3. New Topic Configuration
6.3.新主题配置
Kafka has a wide range of topic configurations that control topic behavior, such as data retention and compression, etc. These have both a server default value as well as an optional per-topic override.
Kafka有一系列控制主题行为的主题配置,例如数据保留和压缩等。这些配置既有服务器默认值,也有每个主题的可选覆盖值。
We can provide the topic configurations by using a config map for the new topic:
我们可以通过使用新主题的配置图来提供主题配置。
// Create a compacted topic with 'lz4' compression codec
Map<String, String> newTopicConfig = new HashMap<>();
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
.configs(newTopicConfig);
The TopicConfig class from the Admin API contains the keys that can be used to configure topics at creation time.
来自Admin API的TopicConfig类包含在创建时可用于配置主题的密钥。
7. Other Topic Operations
7.其他主题业务
As well as the ability to create new topics, Admin API also has operations to delete, list, and describe topics. All these topic-related operations follow the same pattern as we’ve seen for topic creation.
除了创建新主题的能力之外,Admin API 还具有删除、列表和描述主题的操作。所有这些与主题相关的操作都遵循我们在创建主题时看到的相同模式。
Each of these operation methods has an overloaded version that takes an xxxTopicOptions object as input. All of these methods return the corresponding xxxTopicsResult object. This, in turn, provides the KafkaFuture for accessing the results of the asynchronous operation.
每个操作方法都有一个重载版本,它接受一个xxxTopicOptions对象作为输入。所有这些方法都返回相应的xxxTopicsResult对象。这反过来又提供了KafkaFuture,用于访问异步操作的结果。
Finally, it’s also worth mentioning that since its introduction in Kafka version 0.11.0.0, the admin API is still evolving, as indicated by the InterfaceStability.Evolving annotation. This implies that the API can change in the future, and a minor release may break compatibility.
最后,还值得一提的是,自从在Kafka 0.11.0.0版本中引入后,管理员API仍在不断发展,正如InterfaceStability.Evolving注释所表明的。这意味着API在未来可能会发生变化,而一个小版本可能会破坏兼容性。
8. Conclusion
8.结语
In this tutorial, we’ve seen how to create a new topic in Kafka using the Java admin client.
在本教程中,我们已经看到了如何使用Java管理客户端在Kafka中创建一个新的主题。
Initially, we created a topic with default and then with explicit options. Following on from this, we saw how to configure the new topic using various properties. Finally, we briefly covered other topic-related operations using the admin client.
最初,我们创建了一个带有默认选项和明确选项的主题。在这之后,我们看到如何使用各种属性来配置新的主题。最后,我们简要介绍了使用管理客户端的其他与主题相关的操作。
Along the way, we also saw how to use Testcontainers to set up a simple single-node cluster from our tests.
一路走来,我们还看到了如何使用Testcontainers从我们的测试中建立一个简单的单节点集群。
As always, the full source code of the article is available over on GitHub.
一如既往,该文章的完整源代码可在GitHub上获得。