Skip to content

Instantly share code, notes, and snippets.

View rystsov's full-sized avatar

Denis Rystsov rystsov

View GitHub Profile
class Coordinator:
def __init__(self, nodes):
self.nodes = nodes
self.q = 1+len(nodes)/2
def add_node(self, node, timeout):
due = now() + timeout
while node not in self.nodes and due > now():
try:
if len(self.nodes)%2==1:
# set_q has the same effect as filter from the DPP article
Accepted = namedtuple('Accepted', ['n', 'val'])
class Register:
def __init__(self, nodes, q):
self.q = q # quorum size
self.nodes = nodes # addresses of all nodes (including current)
self.promise = -1 # promise
self.accepted = None # accepted pair of a value and its ballot num
self.chosen = None # stores cached chosen value
@synchronized
def prepare(self, n):
def put(self, val, timeout):
due = now() + timeout
while due > now():
result = self._read_write_read(val, due)
if isinstance(result, NetworkError): continue
return result
return NetworkError()
def get(self, timeout):
ok = self.put(None, timeout)
if isinstance(ok, Conflict):
def put(self, test, val, timeout):
due = now() + timeout
while due > now():
result = self._read_write_read(test, val, due)
if isinstance(result, NetworkError): continue
return result # OK or Conflict
return NetworkError()
def get(self, timeout):
return self.put(None, None, timeout)
def set_q(self, q):
@synchronized
def _get_n(self, min):
# returns next ballot number which is greater then min
def _read_write_read(self, val, due):
if self.chosen != None:
if self.chosen == val: return OK(val=val)
return Conflict(val=self.chosen)
n = self._get_n(0)
# Phase I. Sending a prepare request to all nodes. Method
# returns a set of futures.
Accepted = namedtuple('Accepted', ['n', 'val'])
class Variable:
def __init__(self, nodes, q):
self.q = q
self.nodes = nodes
self.promise = 0
self.accepted = Accepted(0,0,None)
@synchronized
def prepare(self, n):
if self.promise.n < n:
def put(self, test, val, timeout):
due = now() + timeout
while due > now():
result = self._read_write_read(test, val, due)
if isinstance(result, NetworkError): continue
return result # OK or Conflict
return NetworkError()
def get(self, timeout):
return self.put(None, None, timeout)
def _read_write_read(self, test, val, due):
n = self._get_n(0)
resps = net.send(self.nodes, lambda x: x.prepare(n), due-now())
try:
# Waiting for q (majority) succeeded responses.
ok = resps.where(lambda x: isinstance(x.msg, Accepted)).wait(self.q)
last = ok.max(lambda x: x.msg.n).msg.val
if test != None and test(last):
candidate = val
else:
Accepted = namedtuple('Accepted', ['n', 'val'])
class Variable:
def __init__(self, nodes, q):
self.q = q
self.nodes = nodes
self.promise = 0
self.accepted = Accepted(0,None)
self.mem = Mem() # we don't persist self.mem and keep it in memory
self.mem.era = 0 # current era
self.mem.ops = 0 # number of active _read_write_read started in current era
def _read_write_read(self, test, val, due):
with self.mem.lock:
# makes a snapshot of the current state
era, nodes, q = self.mem.era, self.nodes, self.q
# increase counter of era's active _read_write_read ops
self.mem.ops+=1
n = self._get_n(0)
resps = net.send(nodes, lambda x: x.prepare(n), due-now())
try:
ok = resps.where(lambda x: isinstance(x.msg, Accepted)).wait(q)