1. Overview
1.概述
In this article, we’ll look at conflict-free replicated data types (CRDT) and how to work with them in Java. For our examples, we’ll use implementations from the wurmloch-crdt library.
在这篇文章中,我们将探讨无冲突复制数据类型(CRDT)以及如何在Java中使用它们。对于我们的例子,我们将使用来自wurmloch-crdt库的实现。
When we have a cluster of N replica nodes in a distributed system, we may encounter a network partition — some nodes are temporarily unable to communicate with each other. This situation is called a split-brain.
当我们在分布式系统中拥有一个由N个复制节点组成的集群时,我们可能会遇到网络分区–一些节点暂时无法相互通信。这种情况被称为 “分脑”。
When we have a split-brain in our system, some write requests — even for the same user — can go to different replicas that are not connected with each other. When such a situation occurs, our system is still available but is not consistent.
当我们的系统有一个分裂的大脑时,一些写请求–甚至是同一个用户的–可能会进入不同的副本,而这些副本之间并没有连接。当这种情况发生时,我们的系统仍然可用,但不一致。
We need to decide what to do with writes and data that are not consistent when the network between two split clusters starts working again.
我们需要决定当两个分裂的集群之间的网络重新开始工作时,如何处理不一致的写入和数据。
2. Conflict-Free Replicated Data Types to the Rescue
2.无冲突的复制数据类型的拯救
Let’s consider two nodes, A and B, that have become disconnected due to a split-brain.
让我们考虑两个节点,A和B,它们由于大脑分裂而变得不相通。
Let’s say that a user changes his login and that a request goes to the node A. Then he/she decides to change it again, but this time the request goes to the node B.
假设一个用户改变了他的登录名,一个请求被发送到节点A。然后他/她决定再次更改,但这次请求到了节点B。
Because of the split-brain, the two nodes are not connected. We need to decide how the login of this user should look when the network is working again.
由于大脑分裂,这两个节点没有连接。我们需要决定当网络重新工作时,这个用户的登录应该是什么样子。
We can utilize a couple of strategies: we can give the opportunity for resolving conflicts to the user (as is done in Google Docs), or we can use a CRDT for merging data from diverged replicas for us.
我们可以利用几个策略:我们可以把解决冲突的机会交给用户(就像在Google Docs中那样),或者我们可以使用CRDT为我们合并来自分歧复制的数据。
3. Maven Dependency
3.Maven的依赖性
First, let’s add a dependency to the library that provides a set of useful CRDTs:
首先,让我们给库添加一个依赖关系,提供一组有用的CRDTs。
<dependency>
<groupId>com.netopyr.wurmloch</groupId>
<artifactId>wurmloch-crdt</artifactId>
<version>0.1.0</version>
</dependency>
The latest version can be found on Maven Central.
最新版本可以在Maven Central上找到。
4. Grow-Only Set
4.仅限生长的套装
The most basic CRDT is a Grow-Only Set. Elements can only be added to a GSet and never removed. When the GSet diverges, it can be easily merged by calculating the union of two sets.
最基本的CRDT是一个只生长的集合。元素只能被添加到GSet,而不能被移除。当GSet出现分歧时,可以通过计算两个集合的联合来轻松地合并。
First, let’s create two replicas to simulate a distributed data structure and connect those two replicas using the connect() method:
首先,让我们创建两个副本来模拟一个分布式数据结构,并使用connect() 方法连接这两个副本。
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
Once we get two replicas in our cluster, we can create a GSet on the first replica and reference it on the second replica:
一旦我们在集群中得到两个副本,我们可以在第一个副本上创建一个GSet,并在第二个副本上引用它。
GSet<String> replica1 = crdtStore1.createGSet("ID_1");
GSet<String> replica2 = crdtStore2.<String>findGSet("ID_1").get();
At this point, our cluster is working as expected, and there is an active connection between two replicas. We can add two elements to the set from two different replicas and assert that the set contains the same elements on both replicas:
在这一点上,我们的集群正在按预期工作,两个副本之间有一个活跃的连接。我们可以从两个不同的副本向集合添加两个元素,并断言该集合在两个副本上都包含相同的元素。
replica1.add("apple");
replica2.add("banana");
assertThat(replica1).contains("apple", "banana");
assertThat(replica2).contains("apple", "banana");
Let’s say that suddenly we have a network partition and there is no connection between the first and second replicas. We can simulate the network partition using the disconnect() method:
假设突然间我们有了一个网络分区,第一个和第二个副本之间没有连接。我们可以使用disconnect()方法来模拟网络分区的情况。
crdtStore1.disconnect(crdtStore2);
Next, when we add elements to the data set from both replicas, those changes are not visible globally because there is no connection between them:
接下来,当我们从两个副本中向数据集添加元素时,这些变化在全局上是不可见的,因为它们之间没有连接。
replica1.add("strawberry");
replica2.add("pear");
assertThat(replica1).contains("apple", "banana", "strawberry");
assertThat(replica2).contains("apple", "banana", "pear");
Once the connection between both cluster members is established again, the GSet is merged internally using a union on both sets, and both replicas are consistent again:
一旦两个集群成员之间的连接再次建立,GSet就会在内部使用两个集合的联合来合并,并且两个副本再次一致。
crdtStore1.connect(crdtStore2);
assertThat(replica1)
.contains("apple", "banana", "strawberry", "pear");
assertThat(replica2)
.contains("apple", "banana", "strawberry", "pear");
5. Increment-Only Counter
5.仅限入境的计数器
Increment-Only counter is a CRDT that aggregates all increments locally on each node.
Increment-Only计数器是一个CRDT,在每个节点上聚集所有的增量。
When replicas synchronize, after a network partition, the resulting value is calculated by summing all increments on all nodes — this is similar to LongAdder from java.concurrent but on a higher abstraction level.
当复制体同步时,在网络分区后,通过对所有节点上的所有增量进行加总来计算结果–这与java.concurrent 中的LongAdder 类似,但抽象程度更高。
Let’s create an increment-only counter using GCounter and increment it from both replicas. We can see that the sum is calculated properly:
让我们使用GCounter创建一个仅有增量的计数器,并从两个副本中增量。我们可以看到,总和的计算是正确的。
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
GCounter replica1 = crdtStore1.createGCounter("ID_1");
GCounter replica2 = crdtStore2.findGCounter("ID_1").get();
replica1.increment();
replica2.increment(2L);
assertThat(replica1.get()).isEqualTo(3L);
assertThat(replica2.get()).isEqualTo(3L);
When we disconnect both cluster members and perform local increment operations, we can see that the values are inconsistent:
当我们断开两个集群成员的连接并进行本地增量操作时,我们可以看到数值是不一致的。
crdtStore1.disconnect(crdtStore2);
replica1.increment(3L);
replica2.increment(5L);
assertThat(replica1.get()).isEqualTo(6L);
assertThat(replica2.get()).isEqualTo(8L);
But once the cluster is healthy again, the increments will be merged, yielding the proper value:
但是一旦集群再次健康,增量将被合并,产生适当的值。
crdtStore1.connect(crdtStore2);
assertThat(replica1.get())
.isEqualTo(11L);
assertThat(replica2.get())
.isEqualTo(11L);
6. PN Counter
6.PN计数器
Using a similar rule for the increment-only counter, we can create a counter that can be both incremented and decremented. The PNCounter stores all increments and decrements separately.
使用类似的规则,我们可以创建一个既可以增量又可以减量的计数器。PNCounter分别存储所有的增量和减量。
When replicas synchronize, the resulting value will be equal to the sum of all increments minus the sum of all decrements:
当副本同步时,产生的值将等于 所有增量之和减去所有减量之和。
@Test
public void givenPNCounter_whenReplicasDiverge_thenMergesWithoutConflict() {
LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);
PNCounter replica1 = crdtStore1.createPNCounter("ID_1");
PNCounter replica2 = crdtStore2.findPNCounter("ID_1").get();
replica1.increment();
replica2.decrement(2L);
assertThat(replica1.get()).isEqualTo(-1L);
assertThat(replica2.get()).isEqualTo(-1L);
crdtStore1.disconnect(crdtStore2);
replica1.decrement(3L);
replica2.increment(5L);
assertThat(replica1.get()).isEqualTo(-4L);
assertThat(replica2.get()).isEqualTo(4L);
crdtStore1.connect(crdtStore2);
assertThat(replica1.get()).isEqualTo(1L);
assertThat(replica2.get()).isEqualTo(1L);
}
7. Last-Writer-Wins Register
7.最后的作家–赢家登记册
Sometimes, we have more complex business rules, and operating on sets or counters is insufficient. We can use the Last-Writer-Wins Register, which keeps only the last updated value when merging diverged data sets. Cassandra uses this strategy to resolve conflicts.
有时,我们有更复杂的业务规则,对集合或计数器的操作是不够的。我们可以使用Last-Writer-Wins Register,它在合并分歧的数据集时只保留最后更新的值。Cassandra使用这种策略来解决冲突。
We need to be very cautious when using this strategy because it drops changes that occurred in the meantime.
我们在使用这一策略时需要非常谨慎,因为它放弃了在此期间发生的变化。
Let’s create a cluster of two replicas and instances of the LWWRegister class:
让我们创建一个由两个副本和LWWRegister类实例组成的集群。
LocalCrdtStore crdtStore1 = new LocalCrdtStore("N_1");
LocalCrdtStore crdtStore2 = new LocalCrdtStore("N_2");
crdtStore1.connect(crdtStore2);
LWWRegister<String> replica1 = crdtStore1.createLWWRegister("ID_1");
LWWRegister<String> replica2 = crdtStore2.<String>findLWWRegister("ID_1").get();
replica1.set("apple");
replica2.set("banana");
assertThat(replica1.get()).isEqualTo("banana");
assertThat(replica2.get()).isEqualTo("banana");
When the first replica sets the value to apple and the second one changes it to banana, the LWWRegister keeps only the last value.
当第一个副本将值设置为苹果,第二个副本将其改为香蕉,LWWRegister只保留最后一个值。
Let’s see what happens if the cluster disconnects:
让我们看看如果集群断开连接会发生什么。
crdtStore1.disconnect(crdtStore2);
replica1.set("strawberry");
replica2.set("pear");
assertThat(replica1.get()).isEqualTo("strawberry");
assertThat(replica2.get()).isEqualTo("pear");
Each replica keeps its local copy of data that is inconsistent. When we call the set() method, the LWWRegister internally assigns a special version value that identifies the specific update to every using a VectorClock algorithm.
每个副本保留其不一致的数据的本地副本。当我们调用set() 方法时,LWWRegister 在内部分配了一个特殊的版本值,该值使用VectorClock算法来识别每个特定的更新。
When the cluster synchronizes, it takes the value with the highest version and discards every previous update:
当集群同步时,它采取具有最高版本的值和丢弃以前的每一个更新。
crdtStore1.connect(crdtStore2);
assertThat(replica1.get()).isEqualTo("pear");
assertThat(replica2.get()).isEqualTo("pear");
8. Conclusion
8.结论
In this article, we showed the problem of consistency of distributed systems while maintaining availability.
在这篇文章中,我们展示了分布式系统的一致性问题,同时保持可用性。
In case of network partitions, we need to merge the diverged data when the cluster is synchronized. We saw how to use CRDTs to perform a merge of diverged data.
在网络分区的情况下,我们需要在集群同步的时候合并分歧的数据。我们看到如何使用CRDTs来执行分歧数据的合并。
All these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.
所有这些例子和代码片段都可以在GitHub项目中找到–这是一个Maven项目,所以应该很容易导入并按原样运行。