Created
March 19, 2019 23:18
-
-
Save daniel-j-h/90a6447c63c9ec663478b4d2d71ab7d1 to your computer and use it in GitHub Desktop.
Conflict-free replicated data type (CRDT) counters - strong eventual consistency in a distributed system https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
class GrowCounter: | |
'''Convergent conflict-free replicated data type state based grow-only counter. | |
Guarantees strong eventual consistency. | |
''' | |
def __init__(self, n): | |
assert n > 0, 'at least one node in cluster' | |
self.n = n | |
self.p = [0 for _ in range(n)] | |
def value(self): | |
return sum(self.p) | |
def increment(self, i): | |
assert 0 <= i < n, 'node id in cluster bounds' | |
self.p[i] += 1 | |
def merge(self, other): | |
assert self.n == other.n | |
self.p = list(map(max, zip(self.p, other.p))) | |
def compare(self, other): | |
assert self.n == other.n | |
return all(l <= r for (l, r) in zip(self.p, other.p)) | |
def __repr__(self): | |
return '<{}: value={}, n={}, p={}>'.format(self.__class__.__name__, | |
self.value(), self.n, self.p) | |
class PosNegCounter: | |
'''Convergent conflict-free replicated data type state based positive-negative counter. | |
Guarantees strong eventual consistency. | |
''' | |
def __init__(self, n): | |
self.pos = GrowCounter(n) | |
self.neg = GrowCounter(n) | |
def value(self): | |
return self.pos.value() - self.neg.value() | |
def increment(self, i): | |
self.pos.increment(i) | |
def decrement(self, i): | |
self.neg.increment(i) | |
def merge(self, other): | |
self.pos.merge(other.pos) | |
self.neg.merge(other.neg) | |
def compare(self, other): | |
return self.pos.compare(other.pos) and self.neg.compare(other.neg) | |
def __repr__(self): | |
return '<{}: value={}, pos={}, neg={}>'.format(self.__class__.__name__, | |
self.value(), self.pos, self.neg) | |
if __name__ == '__main__': | |
# Simulate cluster of n nodes, each node with its own local counter | |
n = 10 | |
counters = [GrowCounter(n) for _ in range(n)] | |
# Count to 100 in a distributed cluster of n nodes | |
for _ in range(100): | |
# Pick an arbitrary node and increment its counter | |
node = random.randrange(n) | |
counters[node].increment(node) | |
# Simulate eventual consistency through arbitrary node communication | |
for i in range(1000): | |
lhs = random.choice(counters) | |
rhs = random.choice(counters) | |
# Merge is commutative, associative, idempotent | |
lhs.merge(rhs) | |
# Merge always increases state monotonically | |
assert rhs.compare(lhs) | |
# If we waited long enough we will see a cluster-wide consistent state | |
for i, counter in enumerate(counters): | |
print('{}: {}'.format(i, counter)) | |
assert all(v.value() == 100 for v in counters), 'final consistent state reached' | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment