1. Introduction
1.介绍
Apache Curator is a Java client for Apache Zookeeper, the popular coordination service for distributed applications.
Apache Curator是Apache Zookeeper的一个Java客户端,后者是分布式应用的流行协调服务。
In this tutorial, we’ll introduce some of the most relevant features provided by Curator:
在本教程中,我们将介绍Curator提供的一些最相关的功能。
- Connection Management – managing connections and retry policies
- Async – enhancing existing client by adding async capabilities and the use of Java 8 lambdas
- Configuration Management – having a centralized configuration for the system
- Strongly-Typed Models – working with typed models
- Recipes – implementing leader election, distributed locks or counters
2. Prerequisites
2.先决条件
To start with, it’s recommended to take a quick look at the Apache Zookeeper and its features.
首先,建议快速浏览一下Apache Zookeeper及其功能。
For this tutorial, we assume that there’s already a standalone Zookeeper instance running on 127.0.0.1:2181; here are instructions on how to install and run it, if you’re just getting started.
在本教程中,我们假设已经有一个独立的Zookeeper实例在127.0.0.1:2181上运行;这里有关于如何安装和运行它的说明,如果你刚刚开始。
First, we’ll need to add the curator-x-async dependency to our pom.xml:
首先,我们需要将curator-x-async依赖性添加到我们的pom.xml。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-async</artifactId>
<version>4.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
The latest version of Apache Curator 4.X.X has a hard dependency with Zookeeper 3.5.X which is still in beta right now.
最新版本的Apache Curator 4.X.X与Zookeeper 3.5.X有硬性依赖关系,而Zookeeper现在仍处于测试阶段。
And so, in this article, we’re going to use the currently latest stable Zookeeper 3.4.11 instead.
因此,在本文中,我们将使用目前最新的稳定版Zookeeper 3.4.11代替。
So we need to exclude the Zookeeper dependency and add the dependency for our Zookeeper version to our pom.xml:
因此,我们需要排除Zookeeper的依赖性,并将Zookeeper版本的依赖性添加到我们的pom.xml。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
For more information about compatibility, please refer to this link.
关于兼容性的更多信息,请参考此链接。
3. Connection Management
3.连接管理
The basic use case of Apache Curator is connecting to a running Apache Zookeeper instance.
Apache Curator的基本用例是连接到一个正在运行的Apache Zookeeper实例。
The tool provides a factory to build connections to Zookeeper using retry policies:
该工具提供了一个工厂,使用重试策略建立与Zookeeper的连接。
int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy = new RetryNTimes(
maxRetries, sleepMsBetweenRetries);
CuratorFramework client = CuratorFrameworkFactory
.newClient("127.0.0.1:2181", retryPolicy);
client.start();
assertThat(client.checkExists().forPath("/")).isNotNull();
In this quick example, we’ll retry 3 times and will wait 100 ms between retries in case of connectivity issues.
在这个快速的例子中,我们将重试3次,在连接问题的情况下,重试之间将等待100毫秒。
Once connected to Zookeeper using the CuratorFramework client, we can now browse paths, get/set data and essentially interact with the server.
一旦使用CuratorFramework客户端连接到Zookeeper,我们现在可以浏览路径,获取/设置数据,并基本上与服务器互动。
4. Async
4.异步
The Curator Async module wraps the above CuratorFramework client to provide non-blocking capabilities using the CompletionStage Java 8 API.
Curator Async 模块包装了上述 CuratorFramework 客户端,以提供非阻塞功能,使用 CompletionStage Java 8 API。
Let’s see how the previous example looks like using the Async wrapper:
让我们看看前面的例子在使用Async包装器后是什么样子。
int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy
= new RetryNTimes(maxRetries, sleepMsBetweenRetries);
CuratorFramework client = CuratorFrameworkFactory
.newClient("127.0.0.1:2181", retryPolicy);
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
AtomicBoolean exists = new AtomicBoolean(false);
async.checkExists()
.forPath("/")
.thenAcceptAsync(s -> exists.set(s != null));
await().until(() -> assertThat(exists.get()).isTrue());
Now, the checkExists() operation works in asynchronous mode, not blocking the main thread. We can also chain actions one after each other using the thenAcceptAsync() method instead, which uses the CompletionStage API.
现在,checkExists()操作在异步模式下工作,不会阻塞主线程。我们还可以使用thenAcceptAsync()方法将操作一个接一个地连锁起来,而不是使用CompletionStage API。
5. Configuration Management
5.配置管理
In a distributed environment, one of the most common challenges is to manage shared configuration among many applications. We can use Zookeeper as a data store where to keep our configuration.
在分布式环境中,最常见的挑战之一是管理许多应用程序之间的共享配置。我们可以使用Zookeeper作为数据存储,来保存我们的配置。
Let’s see an example using Apache Curator to get and set data:
让我们看一个使用Apache Curator获取和设置数据的例子。
CuratorFramework client = newClient();
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";
client.create().forPath(key);
async.setData()
.forPath(key, expected.getBytes());
AtomicBoolean isEquals = new AtomicBoolean();
async.getData()
.forPath(key)
.thenAccept(data -> isEquals.set(new String(data).equals(expected)));
await().until(() -> assertThat(isEquals.get()).isTrue());
In this example, we create the node path, set the data in Zookeeper, and then we recover it checking that the value is the same. The key field could be a node path like /config/dev/my_key.
在这个例子中,我们创建了节点路径,在Zookeeper中设置了数据,然后我们恢复它,检查值是否相同。key字段可以是一个节点路径,比如/config/dev/my_key。
5.1. Watchers
5.1.观察者
Another interesting feature in Zookeeper is the ability to watch keys or nodes. It allows us to listen to changes in the configuration and update our applications without needing to redeploy.
Zookeeper中另一个有趣的功能是监视键或节点的能力。它允许我们监听配置中的变化并更新我们的应用程序,而不需要重新部署。
Let’s see how the above example looks like when using watchers:
让我们看看上面的例子在使用观察者时是什么样子的。
CuratorFramework client = newClient()
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";
async.create().forPath(key);
List<String> changes = new ArrayList<>();
async.watched()
.getData()
.forPath(key)
.event()
.thenAccept(watchedEvent -> {
try {
changes.add(new String(client.getData()
.forPath(watchedEvent.getPath())));
} catch (Exception e) {
// fail ...
}});
// Set data value for our key
async.setData()
.forPath(key, expected.getBytes());
await()
.until(() -> assertThat(changes.size()).isEqualTo(1));
We configure the watcher, set the data, and then confirm the watched event was triggered. We can watch one node or a set of nodes at once.
我们配置观察者,设置数据,然后确认被观察的事件被触发。我们可以同时观察一个节点或一组节点。
6. Strongly Typed Models
6.强类型的模型
Zookeeper primarily works with byte arrays, so we need to serialize and deserialize our data. This allows us some flexibility to work with any serializable instance, but it can be hard to maintain.
Zookeeper主要用字节数组工作,所以我们需要对数据进行序列化和反序列化。这使得我们可以灵活地处理任何可序列化的实例,但它可能很难维护。
To help here, Curator adds the concept of typed models which delegates the serialization/deserialization and allows us to work with our types directly. Let’s see how that works.
为了提供帮助,Curator添加了类型模型的概念,委托序列化/反序列化并允许我们直接使用我们的类型。让我们看看它是如何工作的。
First, we need a serializer framework. Curator recommends using the Jackson implementation, so let’s add the Jackson dependency to our pom.xml:
首先,我们需要一个串行器框架。Curator推荐使用Jackson的实现,因此我们将Jackson的依赖性添加到我们的pom.xml。
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.0</version>
</dependency>
Now, let’s try to persist our custom class HostConfig:
现在,让我们尝试持久化我们的自定义类HostConfig。
public class HostConfig {
private String hostname;
private int port;
// getters and setters
}
We need to provide the model specification mapping from the HostConfig class to a path, and use the modeled framework wrapper provided by Apache Curator:
我们需要提供从HostConfig类到路径的模型规范映射,并使用Apache Curator提供的模型化框架包装器。
ModelSpec<HostConfig> mySpec = ModelSpec.builder(
ZPath.parseWithIds("/config/dev"),
JacksonModelSerializer.build(HostConfig.class))
.build();
CuratorFramework client = newClient();
client.start();
AsyncCuratorFramework async
= AsyncCuratorFramework.wrap(client);
ModeledFramework<HostConfig> modeledClient
= ModeledFramework.wrap(async, mySpec);
modeledClient.set(new HostConfig("host-name", 8080));
modeledClient.read()
.whenComplete((value, e) -> {
if (e != null) {
fail("Cannot read host config", e);
} else {
assertThat(value).isNotNull();
assertThat(value.getHostname()).isEqualTo("host-name");
assertThat(value.getPort()).isEqualTo(8080);
}
});
The whenComplete() method when reading the path /config/dev will return the HostConfig instance in Zookeeper.
读取路径/config/dev时的whenComplete()方法将返回Zookeeper中的HostConfig实例。
7. Recipes
7.菜谱
Zookeeper provides this guideline to implement high-level solutions or recipes such as leader election, distributed locks or shared counters.
Zookeeper提供本指南来实现高层解决方案或配方,如领导者选举、分布式锁或共享计数器。。
Apache Curator provides an implementation for most of these recipes. To see the full list, visit the Curator Recipes documentation.
Apache Curator为这些配方中的大多数提供了一个实现。要查看完整的列表,请访问Curator Recipes 文档。
All of these recipes are available in a separate module:
所有这些菜谱都可以在一个单独的模块中找到。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
Let’s jump right in and start understanding these with some simple examples.
让我们直接跳进去,通过一些简单的例子开始了解这些。
7.1. Leader Election
7.1.领导人选举
In a distributed environment, we may need one master or leader node to coordinate a complex job.
在一个分布式环境中,我们可能需要一个主节点或领导节点来协调一个复杂的工作。
This is how the usage of the Leader Election recipe in Curator looks like:
这就是领导者选举配方在策展人中的使用情况。
CuratorFramework client = newClient();
client.start();
LeaderSelector leaderSelector = new LeaderSelector(client,
"/mutex/select/leader/for/job/A",
new LeaderSelectorListener() {
@Override
public void stateChanged(
CuratorFramework client,
ConnectionState newState) {
}
@Override
public void takeLeadership(
CuratorFramework client) throws Exception {
}
});
// join the members group
leaderSelector.start();
// wait until the job A is done among all members
leaderSelector.close();
When we start the leader selector, our node joins a members group within the path /mutex/select/leader/for/job/A. Once our node becomes the leader, the takeLeadership method will be invoked, and we as leaders can resume the job.
当我们启动领导者选择器时,我们的节点加入路径/mutex/select/leader/for/job/A中的成员组。一旦我们的节点成为领导者,takeLeadership方法将被调用,我们作为领导者可以继续工作。
7.2. Shared Locks
7.2.共用锁
The Shared Lock recipe is about having a fully distributed lock:
共享锁配方是关于拥有一个完全分布的锁。
CuratorFramework client = newClient();
client.start();
InterProcessSemaphoreMutex sharedLock = new InterProcessSemaphoreMutex(
client, "/mutex/process/A");
sharedLock.acquire();
// do process A
sharedLock.release();
When we acquire the lock, Zookeeper ensures that there’s no other application acquiring the same lock at the same time.
当我们获取锁时,Zookeeper确保没有其他应用程序在同一时间获取相同的锁。
7.3. Counters
7.3.计数器
The Counters recipe coordinates a shared Integer among all the clients:
计数器配方协调所有客户之间共享的Integer。
CuratorFramework client = newClient();
client.start();
SharedCount counter = new SharedCount(client, "/counters/A", 0);
counter.start();
counter.setCount(counter.getCount() + 1);
assertThat(counter.getCount()).isEqualTo(1);
In this example, Zookeeper stores the Integer value in the path /counters/A and initializes the value to 0 if the path has not been created yet.
在这个例子中,Zookeeper将Integer值存储在路径/counters/A中,如果路径还没有被创建,则将该值初始化为0。
8. Conclusion
8.结论
In this article, we’ve seen how to use Apache Curator to connect to Apache Zookeeper and take benefit of its main features.
在这篇文章中,我们已经看到了如何使用Apache Curator连接到Apache Zookeeper并利用其主要功能。
We’ve also introduced a few of the main recipes in Curator.
我们还介绍了Curator中的一些主要食谱。
As usual, sources can be found over on GitHub.
像往常一样,可以在GitHub上找到来源。