1. Overview
1.概述
In this article, we’ll present BookKeeper, a service that implements a distributed, fault-tolerant record storage system.
在本文中,我们将介绍BookKeeper,这是一个实现分布式容错记录存储系统的服务。
2. What Is BookKeeper?
2.什么是BookKeeper?
BookKeeper was originally developed by Yahoo as a ZooKeeper subproject and graduated to become a top-level project in 2015. At its core, BookKeeper aims to be a reliable and high-performance system that stores sequences of Log Entries (aka Records) in data structures called Ledgers.
BookKeeper最初是由雅虎作为ZooKeeper子项目开发的,并在2015年毕业,成为一个顶级项目。在其核心部分,BookKeeper旨在成为一个可靠和高性能的系统,在称为Ledgers的数据结构中存储Log Entries(又名Records)的序列。
An important feature of ledgers is the fact that they’re append-only and immutable. This makes BookKeeper a good candidate for certain applications, such as distributed logging systems, Pub-Sub messaging applications, and real-time stream processing.
账本的一个重要特征是它们只需追加和不可更改。这使得BookKeeper成为某些应用的良好候选者,如分布式日志系统、Pub-Sub消息传递应用和实时流处理。
3. BookKeeper Concepts
3.图书保管员的概念
3.1. Log Entries
3.1.日志条目
A log entry contains an indivisible unit of data that a client application stores to or reads from BookKeeper. When stored in a ledger, each entry contains the supplied data and a few metadata fields.
一个日志条目包含一个不可分割的数据单位,客户端应用程序将其存储到BookKeeper或从BookKeeper中读取。当存储在一个账本中时,每个条目包含所提供的数据和一些元数据字段。
Those metadata fields include an entryId, which must be unique within a given ledger. There’s also an authentication code that BookKeeper uses to detect when an entry is corrupt or has been tampered with.
这些元数据字段包括一个entryId,在一个特定的分类账中必须是唯一的。还有一个验证码,BookKeeper用它来检测一个条目是否损坏或被篡改。
BookKeeper offers no serialization features by itself, so clients must devise their own method to convert higher-level constructs to/from byte arrays.
BookKeeper本身不提供任何序列化功能,所以客户必须设计自己的方法,将高层结构转换为/从byte数组。
3.2. Ledgers
3.2.账簿
A ledger is the basic storage unit managed by BookKeeper, storing an ordered sequence of log entries. As mentioned before, ledgers have append-only semantics, meaning that records can’t be modified once added to them.
账本是由BookKeeper管理的基本存储单元,存储着有序的日志条目序列。如前所述,分类账具有只添加的语义,这意味着记录一旦添加到其中就不能被修改。
Also, once a client stops writing to a ledger and closes it, BookKeeper seals it and we can no longer add data to it, even at a later time. This is an important point to keep in mind when designing an application around BookKeeper. Ledgers are not a good candidate to directly implement higher-level constructs, such as a queue. Instead, we see ledgers used more often to create more basic data structures that support those higher-level concepts.
另外,一旦客户停止向账本写入数据并关闭账本,BookKeeper就会封存它,我们就不能再向它添加数据了,即使在以后的时间里。在设计围绕BookKeeper的应用程序时,这一点很重要,要记住。账本并不是直接实现更高级别的结构的好选择,比如说队列。相反,我们更经常看到分类账被用来创建更基本的数据结构,以支持那些更高级别的概念。
For instance, Apache’s Distributed Log project uses ledgers as log segments. Those segments are aggregated into distributed logs, but the underlying ledgers are transparent to regular users.
例如,Apache的分布式日志项目使用分类账作为日志段。这些段被汇总到分布式日志中,但底层的分类账对普通用户是透明的。
BookKeeper achieves ledger resilience by replicating log entries across multiple server instances. Three parameters control how many servers and copies are kept:
BookKeeper通过在多个服务器实例中复制日志条目来实现账本的弹性。三个参数控制了保存多少个服务器和副本。
- Ensemble size: the number of servers used to write ledger data
- Write quorum size: the number of servers used to replicate a given log entry
- Ack quorum size: the number of servers that must acknowledge a given log entry write operation
By adjusting those parameters, we can tune the performance and resilience characteristics of a given ledger. When writing to a ledger, BookKeeper will only consider the operation as successful when a minimum quorum of cluster members acknowledge it.
通过调整这些参数,我们可以调整特定账本的性能和弹性特征。当向分类账写入时,BookKeeper只有在集群成员的最低法定人数确认时才会认为该操作是成功的。
In addition to its internal metadata, BookKeeper also supports adding custom metadata to a ledger. Those are a map of key/value pairs that clients pass at creation time and BookKeeper stores in ZooKeeper alongside its own.
除了其内部元数据,BookKeeper还支持向账本添加自定义元数据。这些是客户在创建时传递的键/值对的映射,BookKeeper将其与自己的键/值对一起存储在ZooKeeper中。
3.3. Bookies
3.3.赌徒
Bookies are servers that hold one or mode ledgers. A BookKeeper cluster consists of a number of bookies running in a given environment, providing services to clients over plain TCP or TLS connections.
账本是持有一个或多个账本的服务器。一个BookKeeper集群由在特定环境中运行的若干账本组成,通过普通的TCP或TLS连接向客户提供服务。
Bookies coordinate actions using cluster services provided by ZooKeeper. This implies that, if we want to achieve a fully fault-tolerant system, we need at least a 3-instance ZooKeeper and a 3-instance BookKeeper setup. Such a setup would be able to tolerate loss if any single instance fails and still be able to operate normally, at least for the default ledger setup: 3-node ensemble size, 2-node write quorum, and 2-node ack quorum.
Bookies使用ZooKeeper提供的集群服务协调行动。这意味着,如果我们想实现一个完全容错的系统,我们至少需要一个3个实例的ZooKeeper和一个3个实例的BookKeeper设置。这样的设置将能够容忍任何单一实例失败的损失,并且仍然能够正常运行,至少对于默认的账本设置而言。3个节点的合集规模,2个节点的写法定人数,以及2个节点的ack法定人数。
4. Local Setup
4.本地设置
The basic requirements to run BookKeeper locally are quite modest. First, we need a ZooKeeper instance up and running, which provides ledger metadata storage for BookKeeper. Next, we deploy a bookie, which provides the actual services to clients.
在本地运行BookKeeper的基本要求是相当低的。首先,我们需要一个ZooKeeper实例,它为BookKeeper提供账本元数据存储。接下来,我们部署一个bookie,为客户提供实际服务。
While it’s certainly possible to do those steps manually, here we’ll use a docker-compose file that uses official Apache images to simplify this task:
虽然当然可以手动完成这些步骤,但在这里我们将使用一个docker-compose文件,它使用官方的Apache图像来简化这项任务。
$ cd <path to docker-compose.yml>
$ docker-compose up
This docker-compose creates three bookies and a ZooKeeper instance. Since all bookies run on the same machine, it’s only useful for testing purposes. The official documentation contains the necessary steps to configure a fully fault-tolerant cluster.
这个docker-compose创建了三个bookies和一个ZooKeeper实例。由于所有的bookies都在同一台机器上运行,它只对测试有用。官方文档包含了配置一个完全容错的集群的必要步骤。
Let’s do a basic test to check that it’s working as expected, using bookkeeper’s shell command listbookies:
让我们做一个基本测试,以检查它是否按预期工作,使用bookkeeper的shell命令listbookies。
$ docker exec -it apache-bookkeeper_bookie_1 /opt/bookkeeper/bin/bookkeeper \
shell listbookies -readwrite
ReadWrite Bookies :
192.168.99.101(192.168.99.101):4181
192.168.99.101(192.168.99.101):4182
192.168.99.101(192.168.99.101):3181
The output shows the list of available bookies, consisting of three bookies. Please note that the IP addresses shown will change depending of the specifics of the local Docker installation.
输出显示了可用的bookies列表,由三个bookies组成。请注意,显示的IP地址将根据本地Docker安装的具体情况而改变。
5. Using the Ledger API
5.使用Ledger API
The Ledger API is the most basic way to interface with BookKeeper. It allows us to interact directly with Ledger objects but, on the other hand, lacks direct support for higher-level abstractions such as streams. For those use cases, the BookKeeper project offers another library, DistributedLog, which supports those features.
Ledger API是与BookKeeper接口的最基本方式。它允许我们直接与Ledger对象交互,但另一方面,它缺乏对更高层次抽象的直接支持,如流。对于这些用例,BookKeeper项目提供了另一个库,DistributedLog,它支持这些功能。
Using the Ledger API requires adding the bookkeeper-server dependency to our project:
使用Ledger API需要向我们的项目添加bookkeeper-server依赖。
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
<version>4.10.0</version>
</dependency>
NOTE: As stated in the documentation, using this dependency will also include dependencies for the protobuf and guava libraries. Should our project also need those libraries, but at a different version than those used by BookKeeper, we could use an alternative dependency that shades those libraries:
注意:正如文档中所述,使用此依赖关系也将包括protobuf和guava库的依赖关系。如果我们的项目也需要这些库,但其版本与 BookKeeper 使用的版本不同,我们可以使用遮蔽这些库的替代依赖项。
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server-shaded</artifactId>
<version>4.10.0</version>
</dependency>
5.1. Connecting to Bookies
5.1.连接到博彩公司
The BookKeeper class is the main entry point of the Ledger API, providing a few methods to connect to our BookKeeper service. In its simplest form, all we need to do is create a new instance of this class, passing the address of one of the ZooKeeper servers used by BookKeeper:
BookKeeper类是Ledger API的主要入口,提供了一些方法来连接到我们的BookKeeper服务。在最简单的形式下,我们需要做的就是创建一个新的类的实例,传递BookKeeper使用的ZooKeeper服务器之一的地址。
BookKeeper client = new BookKeeper("zookeeper-host:2131");
Here, zookeeper-host should be set to the IP address or hostname of the ZooKeeper server that holds BookKeeper’s cluster configuration. In our case, that’s usually “localhost” or the host that the DOCKER_HOST environment variable points to.
这里,zookeeper-host应该被设置为持有BookKeeper集群配置的ZooKeeper服务器的IP地址或主机名。在我们的例子中,这通常是 “localhost “或DOCKER_HOST环境变量所指向的主机。
If we need more control over the several parameters available to fine-tune our client, we can use a ClientConfiguration instance and use it to create our client:
如果我们需要对可用的几个参数进行更多的控制来微调我们的客户端,我们可以使用一个ClientConfiguration实例并使用它来创建我们的客户端。
ClientConfiguration cfg = new ClientConfiguration();
cfg.setMetadataServiceUri("zk+null://zookeeper-host:2131");
// ... set other properties
BookKeeper.forConfig(cfg).build();
5.2. Creating a Ledger
5.2.创建账簿
Once we have a BookKeeper instance, creating a new ledger is straightforward:
一旦我们有了一个BookKeeper实例,创建一个新的分类账就很简单了。
LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC,"password".getBytes());
Here, we’ve used the simplest variant of this method. It will create a new ledger with default settings, using the MAC digest type to ensure entry integrity.
在这里,我们使用了这种方法的最简单的变体。它将以默认设置创建一个新的账本,使用MAC摘要类型来确保条目的完整性。
If we want to add custom metadata to our ledger, we need to use a variant that takes all parameters:
如果我们想给我们的账本添加自定义元数据,我们需要使用一个接受所有参数的变体。
LedgerHandle lh = bk.createLedger(
3,
2,
2,
DigestType.MAC,
"password".getBytes(),
Collections.singletonMap("name", "my-ledger".getBytes()));
This time, we’ve used the full version of the createLedger() method. The three first arguments are the ensemble size, write quorum, and ack quorum values, respectively. Next, we have the same digest parameters as before. Finally, we pass a Map with our custom metadata.
这一次,我们使用了完整版的createLedger()方法。前面的三个参数分别是合集大小、写法定人数、Ack法定人数的值。接下来,我们有和之前一样的摘要参数。最后,我们传递一个带有我们自定义元数据的Map。
In both cases above, createLedger is a synchronous operation. BookKeeper also offers asynchronous ledger creation using a callback:
在上述两种情况下,创建分类账是一个同步操作。BookKeeper也提供了使用回调的异步账本创建。
bk.asyncCreateLedger(
3,
2,
2,
BookKeeper.DigestType.MAC, "passwd".getBytes(),
(rc, lh, ctx) -> {
// ... use lh to access ledger operations
},
null,
Collections.emptyMap());
Newer versions of BookKeeper (>= 4.6) also support a fluent-style API and CompletableFuture to achieve the same goal:
新版本的BookKeeper(>=4.6)也支持流畅式API和CompletableFuture来实现同样的目标。
CompletableFuture<WriteHandle> cf = bk.newCreateLedgerOp()
.withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
.withPassword("password".getBytes())
.execute();
Note that, in this case, we get a WriteHandle instead of a LedgerHandle. As we’ll see later, we can use any of them to access our ledger as LedgerHandle implements WriteHandle.
请注意,在这种情况下,我们得到的是一个WriteHandle,而不是一个LedgerHandle。正如我们稍后将看到的,我们可以使用任何一个来访问我们的账本,因为LedgerHandle实现了WriteHandle。
5.3. Writing Data
5.3.写入数据
Once we’ve acquired a LedgerHandle or WriteHandle, we write data to the associated ledger using one of the append() method variants. Let’s start with the synchronous variant:
一旦我们获得了LedgerHandle或WriteHandle,我们就使用append()方法的一个变体将数据写入相关的分类账。让我们从同步变体开始。
for(int i = 0; i < MAX_MESSAGES; i++) {
byte[] data = new String("message-" + i).getBytes();
lh.append(data);
}
Here, we’re using a variant that takes a byte array. The API also supports Netty’s ByteBuf and Java NIO’s ByteBuffer, which allow better memory management in time-critical scenarios.
在这里,我们使用了一个变体,它需要一个byte数组。该API还支持Netty的ByteBuf和Java NIO的ByteBuffer,它们可以在时间紧迫的情况下更好地管理内存。
For asynchronous operations, the API differs a bit depending on the specific handle type we’ve acquired. WriteHandle uses CompletableFuture, whereas LedgerHandle also supports callback-based methods:
对于异步操作,根据我们获得的特定句柄类型,API有一些不同。WriteHandle使用CompletableFuture,而LedgerHandle也支持基于回调的方法。
// Available in WriteHandle and LedgerHandle
CompletableFuture<Long> f = lh.appendAsync(data);
// Available only in LedgerHandle
lh.asyncAddEntry(
data,
(rc,ledgerHandle,entryId,ctx) -> {
// ... callback logic omitted
},
null);
Which one to choose is largely a personal choice, but in general, using CompletableFuture-based APIs tends to be easier to read. Also, there’s the side benefit that we can construct a Mono directly from it, making it easier to integrate BookKeeper in reactive applications.
选择哪一个主要是个人的选择,但一般来说,使用基于CompletableFuture的API往往更容易阅读。另外,还有一个好处,就是我们可以直接从它那里构造一个Mono,从而更容易将BookKeeper集成到反应式应用程序中。
5.4. Reading Data
5.4.读取数据
Reading data from a BookKeeper ledger works in a similar way to writing. First, we use our BookKeeper instance to create a LedgerHandle:
从BookKeeper账本中读取数据的方式与写入类似。首先,我们使用我们的BookKeeper 实例来创建一个LedgerHandle:。
LedgerHandle lh = bk.openLedger(
ledgerId,
BookKeeper.DigestType.MAC,
ledgerPassword);
Except for the ledgerId parameter, which we’ll cover later, this code looks much like the createLedger() method we’ve seen before. There’s an important difference, though; this method returns a read-only LedgerHandle instance. If we try to use any of the available append() methods, all we’ll get is an exception.
除了ledgerId参数(我们将在后面介绍),这段代码看起来很像我们之前看到的createLedger()方法。但有一个重要的区别;这个方法返回一个只读的LedgerHandle实例。如果我们试图使用任何可用的append()方法,我们得到的将是一个异常。
Alternatively, a safer way is to use the fluent-style API:
另外,一个更安全的方法是使用流畅式的API。
ReadHandle rh = bk.newOpenLedgerOp()
.withLedgerId(ledgerId)
.withDigestType(DigestType.MAC)
.withPassword("password".getBytes())
.execute()
.get();
ReadHandle has the required methods to read data from our ledger:
ReadHandle具有从我们的账本中读取数据的必要方法。
long lastId = lh.readLastConfirmed();
rh.read(0, lastId).forEach((entry) -> {
// ... do something
});
Here, we’ve simply requested all available data in this ledger using the synchronous read variant. As expected, there’s also an async variant:
在这里,我们使用同步读变量简单地请求了这个账本中的所有可用数据。正如预期的那样,也有一个异步的变量。
rh.readAsync(0, lastId).thenAccept((entries) -> {
entries.forEach((entry) -> {
// ... process entry
});
});
If we choose to use the older openLedger() method, we’ll find additional methods that support the callback style for async methods:
如果我们选择使用旧的openLedger()方法,我们会发现额外的方法支持异步方法的回调风格。
lh.asyncReadEntries(
0,
lastId,
(rc,lh,entries,ctx) -> {
while(entries.hasMoreElements()) {
LedgerEntry e = ee.nextElement();
}
},
null);
5.5. Listing Ledgers
5.5.列出分类账
We’ve seen previously that we need the ledger’s id to open and read its data. So, how do we get one? One way is using the LedgerManager interface, which we can access from our BookKeeper instance. This interface basically deals with ledger metadata, but also has the asyncProcessLedgers() method. Using this method – and some help form concurrent primitives – we can enumerate all available ledgers:
我们之前已经看到,我们需要账本的id来打开和读取其数据。那么,我们怎样才能得到一个呢?一种方法是使用LedgerManager接口,我们可以从我们的BookKeeper实例中访问该接口。这个接口基本上处理分类账元数据,但也有asyncProcessLedgers()方法。使用这个方法–以及一些帮助形成的并发原语–我们可以枚举所有可用的分类账。
public List listAllLedgers(BookKeeper bk) {
List ledgers = Collections.synchronizedList(new ArrayList<>());
CountDownLatch processDone = new CountDownLatch(1);
bk.getLedgerManager()
.asyncProcessLedgers(
(ledgerId, cb) -> {
ledgers.add(ledgerId);
cb.processResult(BKException.Code.OK, null, null);
},
(rc, s, obj) -> {
processDone.countDown();
},
null,
BKException.Code.OK,
BKException.Code.ReadException);
try {
processDone.await(1, TimeUnit.MINUTES);
return ledgers;
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
Let’s digest this code, which is a bit longer than expected for a seemingly trivial task. The asyncProcessLedgers() method requires two callbacks.
让我们来消化一下这段代码,对于一项看似微不足道的任务来说,它比预期的要长一些。asyncProcessLedgers()方法需要两个回调。
The first one collects all ledgers ids in a list. We’re using a synchronized list here because this callback can be called from multiple threads. Besides the ledger id, this callback also receives a callback parameter. We must call its processResult() method to acknowledge that we’ve processed the data and to signal that we’re ready to get more data.
第一个是在一个列表中收集所有账本的ID。我们在这里使用一个同步的列表,因为这个回调可以从多个线程中调用。除了分类账id,这个回调也接收一个回调参数。我们必须调用它的processResult()方法来确认我们已经处理了数据,并发出信号说我们已经准备好获取更多的数据。
The second callback gets called when all ledgers have been sent to the processor callback or when there’s a failure. In our case, we’ve omitted the error handling. Instead, we’re just decrementing a CountDownLatch, which, in turn, will finish the await operation and allow the method to return with a list of all available ledgers.
第二个回调在所有账本都被发送到处理器回调时或出现故障时被调用。在我们的案例中,我们已经省略了错误处理。相反,我们只是递减一个CountDownLatch,反过来,这将完成await操作,并允许该方法返回所有可用的分类帐列表。
6. Conclusion
6.结语
In this article we’ve covered the Apache BookKeeper project, taking a look at its core concepts and using its low-level API to access Ledgers and perform read/write operations.
在这篇文章中,我们已经介绍了Apache BookKeeper项目,看了它的核心概念,并使用它的底层API来访问Ledgers和执行读写操作。
As usual, all code is available over on GitHub.
像往常一样,所有的代码都可以在GitHub上找到。