Skip to content

Instantly share code, notes, and snippets.

@rystsov
Last active September 5, 2015 05:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rystsov/9de335004dc2718f70f4 to your computer and use it in GitHub Desktop.
Save rystsov/9de335004dc2718f70f4 to your computer and use it in GitHub Desktop.
@synchronized
def _get_n(self, min):
# returns next ballot number which is greater then min
def _read_write_read(self, val, due):
if self.chosen != None:
if self.chosen == val: return OK(val=val)
return Conflict(val=self.chosen)
n = self._get_n(0)
# Phase I. Sending a prepare request to all nodes. Method
# returns a set of futures.
resps = net.send(self.nodes, lambda x: x.prepare(n), due-now())
try:
# Waiting for q (majority) succeeded responses.
# I suppose that isinstance(None, T) is true for any type T
ok = resps.where(lambda x: isinstance(x.msg, Accepted)).wait(self.q)
if ok.all(lambda x: x.msg==None):
if val==None: return OK(val)
candidate = val
else:
candidate = ok.where(lambda x: x.msg!=None).max(lambda x: x.msg.n).msg.val
# Phase II. Sending a value to the responded nodes.
acks = net.send(ok.nodes, lambda x: x.accept(n, candidate), due-now())
resps += acks
acks.where(lambda x: isinstance(x.msg, OK)).take(self.q).wait()
# If we reached this point then we received an ack from a
# majority of the nodes and can be sured that the candidate
# is chosen.
net.send(self.nodes, lambda x: x.choose(candidate))
if candidate == val: return OK(val=candidate)
return Conflict(val=candidate)
except CantResolveWait:
return NetworkError()
finally:
resps = resps.abort() # abort the connections and
# pick a ballot number greater than any of already known
for x in resps.where(lambda x: isinstance(x.msg, Conflict)):
self._get_n(x.msg.n)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment