Skip to content

Instantly share code, notes, and snippets.

@rystsov
Last active August 26, 2015 14:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rystsov/0644f6cf7b45f5a72e81 to your computer and use it in GitHub Desktop.
Save rystsov/0644f6cf7b45f5a72e81 to your computer and use it in GitHub Desktop.
class Coordinator:
def __init__(self, nodes):
self.nodes = nodes
self.q = 1+len(nodes)/2
def add_node(self, node, timeout):
due = now() + timeout
while node not in self.nodes and due > now():
try:
if len(self.nodes)%2==1:
# set_q has the same effect as filter from the DPP article
resps = net.send(self.nodes, lambda x: x.set_q(self.q+1), due-now())
resps.where(lambda x: isinstance(x.msg, OK)).wait(len(self.nodes))
# increase replication (if the value was accepted)
net.send(self.nodes, lambda x: x.get(due-now()), due-now())
resps.where(lambda x: isinstance(x.msg, OK)).wait(len(self.nodes))
# add a node
net.send(self.nodes, lambda x: x.add_node(node), due-now())
resps.where(lambda x: isinstance(x.msg, OK)).wait(len(self.nodes))
self.nodes.append(node)
self.q+=1
else:
# add a node
net.send(self.nodes, lambda x: x.add_node(node), due-now())
resps.where(lambda x: isinstance(x.msg, OK)).wait(len(self.nodes))
self.nodes.append(node)
except CantResolveWait:
continue
if node in self.nodes: return OK()
return Error()
def rm_node(self, node, timeout):
due = now() + timeout
while node in self.nodes and due > now():
try:
if len(self.nodes)%2==1:
resps = net.send(self.nodes, lambda x: x.rm_node(node), due-now())
resps.where(lambda x: isinstance(x.msg, OK)).wait(len(self.nodes))
self.nodes.rm(node)
else:
resps = net.send(self.nodes, lambda x: x.rm_node(node), due-now())
resps.where(lambda x: isinstance(x.msg, OK)).wait(len(self.nodes))
resps = net.send(self.nodes, lambda x: x.set_q(self.q-1), due-now())
resps.where(lambda x: isinstance(x.msg, OK)).wait(len(self.nodes))
self.nodes.rm(node)
self.q-=1
except CantResolveWait:
continue
if node not in self.nodes: return OK()
return Error()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment