This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Coordinator: | |
def __init__(self, nodes): | |
self.nodes = nodes | |
self.q = 1+len(nodes)/2 | |
def add_node(self, node, timeout): | |
due = now() + timeout | |
while node not in self.nodes and due > now(): | |
try: | |
if len(self.nodes)%2==1: | |
# set_q has the same effect as filter from the DPP article |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Accepted = namedtuple('Accepted', ['n', 'val']) | |
class Register: | |
def __init__(self, nodes, q): | |
self.q = q # quorum size | |
self.nodes = nodes # addresses of all nodes (including current) | |
self.promise = -1 # promise | |
self.accepted = None # accepted pair of a value and its ballot num | |
self.chosen = None # stores cached chosen value | |
@synchronized | |
def prepare(self, n): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def put(self, val, timeout): | |
due = now() + timeout | |
while due > now(): | |
result = self._read_write_read(val, due) | |
if isinstance(result, NetworkError): continue | |
return result | |
return NetworkError() | |
def get(self, timeout): | |
ok = self.put(None, timeout) | |
if isinstance(ok, Conflict): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def put(self, test, val, timeout): | |
due = now() + timeout | |
while due > now(): | |
result = self._read_write_read(test, val, due) | |
if isinstance(result, NetworkError): continue | |
return result # OK or Conflict | |
return NetworkError() | |
def get(self, timeout): | |
return self.put(None, None, timeout) | |
def set_q(self, q): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@synchronized | |
def _get_n(self, min): | |
# returns next ballot number which is greater then min | |
def _read_write_read(self, val, due): | |
if self.chosen != None: | |
if self.chosen == val: return OK(val=val) | |
return Conflict(val=self.chosen) | |
n = self._get_n(0) | |
# Phase I. Sending a prepare request to all nodes. Method | |
# returns a set of futures. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Accepted = namedtuple('Accepted', ['n', 'val']) | |
class Variable: | |
def __init__(self, nodes, q): | |
self.q = q | |
self.nodes = nodes | |
self.promise = 0 | |
self.accepted = Accepted(0,0,None) | |
@synchronized | |
def prepare(self, n): | |
if self.promise.n < n: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def put(self, test, val, timeout): | |
due = now() + timeout | |
while due > now(): | |
result = self._read_write_read(test, val, due) | |
if isinstance(result, NetworkError): continue | |
return result # OK or Conflict | |
return NetworkError() | |
def get(self, timeout): | |
return self.put(None, None, timeout) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _read_write_read(self, test, val, due): | |
n = self._get_n(0) | |
resps = net.send(self.nodes, lambda x: x.prepare(n), due-now()) | |
try: | |
# Waiting for q (majority) succeeded responses. | |
ok = resps.where(lambda x: isinstance(x.msg, Accepted)).wait(self.q) | |
last = ok.max(lambda x: x.msg.n).msg.val | |
if test != None and test(last): | |
candidate = val | |
else: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Accepted = namedtuple('Accepted', ['n', 'val']) | |
class Variable: | |
def __init__(self, nodes, q): | |
self.q = q | |
self.nodes = nodes | |
self.promise = 0 | |
self.accepted = Accepted(0,None) | |
self.mem = Mem() # we don't persist self.mem and keep it in memory | |
self.mem.era = 0 # current era | |
self.mem.ops = 0 # number of active _read_write_read started in current era |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _read_write_read(self, test, val, due): | |
with self.mem.lock: | |
# makes a snapshot of the current state | |
era, nodes, q = self.mem.era, self.nodes, self.q | |
# increase counter of era's active _read_write_read ops | |
self.mem.ops+=1 | |
n = self._get_n(0) | |
resps = net.send(nodes, lambda x: x.prepare(n), due-now()) | |
try: | |
ok = resps.where(lambda x: isinstance(x.msg, Accepted)).wait(q) |
OlderNewer