Skip to content

Instantly share code, notes, and snippets.

@daniel-j-h
Created March 19, 2019 23:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daniel-j-h/90a6447c63c9ec663478b4d2d71ab7d1 to your computer and use it in GitHub Desktop.
Save daniel-j-h/90a6447c63c9ec663478b4d2d71ab7d1 to your computer and use it in GitHub Desktop.
Conflict-free replicated data type (CRDT) counters - strong eventual consistency in a distributed system https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type
import random
class GrowCounter:
'''Convergent conflict-free replicated data type state based grow-only counter.
Guarantees strong eventual consistency.
'''
def __init__(self, n):
assert n > 0, 'at least one node in cluster'
self.n = n
self.p = [0 for _ in range(n)]
def value(self):
return sum(self.p)
def increment(self, i):
assert 0 <= i < n, 'node id in cluster bounds'
self.p[i] += 1
def merge(self, other):
assert self.n == other.n
self.p = list(map(max, zip(self.p, other.p)))
def compare(self, other):
assert self.n == other.n
return all(l <= r for (l, r) in zip(self.p, other.p))
def __repr__(self):
return '<{}: value={}, n={}, p={}>'.format(self.__class__.__name__,
self.value(), self.n, self.p)
class PosNegCounter:
'''Convergent conflict-free replicated data type state based positive-negative counter.
Guarantees strong eventual consistency.
'''
def __init__(self, n):
self.pos = GrowCounter(n)
self.neg = GrowCounter(n)
def value(self):
return self.pos.value() - self.neg.value()
def increment(self, i):
self.pos.increment(i)
def decrement(self, i):
self.neg.increment(i)
def merge(self, other):
self.pos.merge(other.pos)
self.neg.merge(other.neg)
def compare(self, other):
return self.pos.compare(other.pos) and self.neg.compare(other.neg)
def __repr__(self):
return '<{}: value={}, pos={}, neg={}>'.format(self.__class__.__name__,
self.value(), self.pos, self.neg)
if __name__ == '__main__':
# Simulate cluster of n nodes, each node with its own local counter
n = 10
counters = [GrowCounter(n) for _ in range(n)]
# Count to 100 in a distributed cluster of n nodes
for _ in range(100):
# Pick an arbitrary node and increment its counter
node = random.randrange(n)
counters[node].increment(node)
# Simulate eventual consistency through arbitrary node communication
for i in range(1000):
lhs = random.choice(counters)
rhs = random.choice(counters)
# Merge is commutative, associative, idempotent
lhs.merge(rhs)
# Merge always increases state monotonically
assert rhs.compare(lhs)
# If we waited long enough we will see a cluster-wide consistent state
for i, counter in enumerate(counters):
print('{}: {}'.format(i, counter))
assert all(v.value() == 100 for v in counters), 'final consistent state reached'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment