Skip to content

Instantly share code, notes, and snippets.

@rystsov
Last active September 14, 2015 08:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rystsov/bfa2836dd76a863c6df2 to your computer and use it in GitHub Desktop.
Save rystsov/bfa2836dd76a863c6df2 to your computer and use it in GitHub Desktop.
Accepted = namedtuple('Accepted', ['n', 'state'])
class Acceptor:
def __init__(self, node_id):
self.node_id = node_id
self.promise = 0
self.accepted = Accepted(0,None)
emit_accepted(0, node_id=self.node_id, ts=now(), accepted_n=0, state=self.accepted.state)
@synchronized
def prepare(self, ballot_n):
if self.promise < ballot_n:
self.promise = ballot_n
emit_promised(ballot_n, node_id=self.node_id, ts=now(), accepted_n=self.accepted.n)
return self.accepted
return Conflict(promise=self.promise)
@synchronized
def accept(self, ballot_n, state):
if self.promise == ballot_n:
self.accepted = Accepted(ballot_n, state)
emit_accepted(ballot_n, node_id=self.node_id, ts=now(), accepted_n=0, state=self.accepted.state)
return OK()
return Conflict(promise=self.promise)
class Proposer:
def __init__(self, nodes, q):
self.q = q
self.nodes = nodes
def change_query(self, change, query, due):
n = self._get_n(0)
resps = net.send(self.nodes, lambda x: x.prepare(ballot_n=n), due-now())
try:
# Waiting for q (majority) succeeded responses.
ok = resps.where(lambda x: isinstance(x.msg, Accepted)).wait(self.q)
emit_prepared(n,
change=change,
vassals=ok.map(lambda x: dict(node_id=x.node.id,
accepted_n=x.msg.n,
state=x.msg.state)))
last = ok.max(lambda x: x.msg.n).msg
# always true, but explicit check simplyfies proof
assert last.n < n
try:
state = change(last.state)
on_executed = lambda x: return OK(query(state))
except Conflict as e:
state = last.state
on_executed = lambda x: return Conflict(e.data)
acks = net.send(ok.map(lambda: x.node), lambda x: x.accept(n, state), due-now())
resps += acks
acks.where(lambda x: isinstance(x.msg, OK)).wait(self.q)
emit_executed(n, state=state, accepted_n=n)
return on_executed()
except CantResolveWait:
return NetworkError()
finally:
resps = resps.abort() # abort the connections and
# pick a ballot number greater than any of already known
for x in resps.where(lambda x: isinstance(x.msg, Conflict)):
self._get_n(x.msg.promise)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment