Skip to content

Instantly share code, notes, and snippets.

@rystsov
Last active December 31, 2015 02:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rystsov/ca9d195b2737039faaf3 to your computer and use it in GitHub Desktop.
Save rystsov/ca9d195b2737039faaf3 to your computer and use it in GitHub Desktop.
class Proposer:
def __init__(self, nodes, q, node_id):
self.q = q
self.nodes = nodes
self.node_id = node_id
# generates a new ballot number which is greater than v
def get_n(self, v):
# ...
@endpoint
def change_query(self, change, query, due):
n = self.get_n(0)
resps = net.send(self.nodes, lambda x: x.prepare(ballot_n=n), due-now())
try:
# Waiting for q (majority) succeeded responses.
ok = resps.where(lambda x: isinstance(x.msg, Accepted)).wait(self.q)
emit_prepared(n,
change=change,
reads=ok.map(lambda x: dict(node_id=x.node.id,
accepted_n=x.msg.n,
state=x.msg.state)))
last = ok.max(lambda x: x.msg.n).msg
# always true, but explicit check simplyfies proof
# since it guarantees monotonicity of accepted states
assert last.n < n
try:
state = change(last.state)
on_executed = lambda x: return OK(query(state))
except Conflict as e:
state = last.state
on_executed = lambda x: return Conflict(e.data)
acks = net.send(ok.map(lambda: x.node), lambda x: x.accept(n, state), due-now())
resps += acks
ok = acks.where(lambda x: isinstance(x.msg, OK)).wait(self.q)
emit_executed(n, state=state, accepted_n=n,
writes=ok.map(lambda x: dict(node_id=x.node.id)))
try:
return on_executed()
except Conflict as e:
return Conflict(e.data)
except CantResolveWait:
return NetworkError()
finally:
resps = resps.abort() # abort the connections and
# pick a ballot number greater than any of already known
for x in resps.where(lambda x: isinstance(x.msg, Conflict)):
self.get_n(x.msg.promise)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment