Skip to content

Instantly share code, notes, and snippets.

Created March 19, 2019 23:18
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
Conflict-free replicated data type (CRDT) counters - strong eventual consistency in a distributed system
import random
class GrowCounter:
'''Convergent conflict-free replicated data type state based grow-only counter.
Guarantees strong eventual consistency.
def __init__(self, n):
assert n > 0, 'at least one node in cluster'
self.n = n
self.p = [0 for _ in range(n)]
def value(self):
return sum(self.p)
def increment(self, i):
assert 0 <= i < n, 'node id in cluster bounds'
self.p[i] += 1
def merge(self, other):
assert self.n == other.n
self.p = list(map(max, zip(self.p, other.p)))
def compare(self, other):
assert self.n == other.n
return all(l <= r for (l, r) in zip(self.p, other.p))
def __repr__(self):
return '<{}: value={}, n={}, p={}>'.format(self.__class__.__name__,
self.value(), self.n, self.p)
class PosNegCounter:
'''Convergent conflict-free replicated data type state based positive-negative counter.
Guarantees strong eventual consistency.
def __init__(self, n):
self.pos = GrowCounter(n)
self.neg = GrowCounter(n)
def value(self):
return self.pos.value() - self.neg.value()
def increment(self, i):
def decrement(self, i):
def merge(self, other):
def compare(self, other):
return and
def __repr__(self):
return '<{}: value={}, pos={}, neg={}>'.format(self.__class__.__name__,
self.value(), self.pos, self.neg)
if __name__ == '__main__':
# Simulate cluster of n nodes, each node with its own local counter
n = 10
counters = [GrowCounter(n) for _ in range(n)]
# Count to 100 in a distributed cluster of n nodes
for _ in range(100):
# Pick an arbitrary node and increment its counter
node = random.randrange(n)
# Simulate eventual consistency through arbitrary node communication
for i in range(1000):
lhs = random.choice(counters)
rhs = random.choice(counters)
# Merge is commutative, associative, idempotent
# Merge always increases state monotonically
# If we waited long enough we will see a cluster-wide consistent state
for i, counter in enumerate(counters):
print('{}: {}'.format(i, counter))
assert all(v.value() == 100 for v in counters), 'final consistent state reached'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment