Skip to content

Instantly share code, notes, and snippets.

View rystsov's full-sized avatar

Denis Rystsov rystsov

View GitHub Profile
class Variable:
def __init__(self, nodes, q, node_id):
self.proposer = Proposer(nodes, q, node_id)
@endpoint
def write(self, ver, value, due):
def change(previous):
if previous == None or previous.ver==ver:
return Val(ver = ver+1, value = value)
return previous
def query(current):
@rystsov
rystsov / dswitch.py
Last active September 27, 2015 06:11
class Switch:
def __init__(self, nodes, q, node_id):
self.proposer = Proposer(nodes, q, node_id)
@endpoint
def write(self, value, due):
def change(previous):
return value if previous == None else previous
def query(current):
if current == value:
return current
def cas_update(old, new):
def change(x):
if x!=old: raise Conflict(x)
return new
return change
class Proposer:
def __init__(self, nodes, q, node_id):
self.q = q
self.nodes = nodes
self.node_id = node_id
# generates a new ballot number which is greater than v
def get_n(self, v):
# ...
@endpoint
def change_query(self, change, query, due):
Accepted = namedtuple('Accepted', ['n', 'state'])
class Acceptor:
def __init__(self, node_id, hdd):
self.node_id = node_id
if hdd.is_empty():
hdd.write(promise=0, accepted=Accepted(0,None))
# emit_* functions are used to record a event and capture a bit of context
# The proof is based on reasoning on the structure of the sequence of the event
# that the program can generate
emit_accepted(0, node_id=self.node_id, ts=now(), accepted_n=0, state=self.hdd.accepted.state)
Accepted = namedtuple('Accepted', ['n', 'state'])
class Acceptor:
def __init__(self, node_id):
self.node_id = node_id
self.promise = 0
self.accepted = Accepted(0,None)
emit_accepted(0, node_id=self.node_id, ts=now(), accepted_n=0, state=self.accepted.state)
@synchronized
def prepare(self, ballot_n):
class Coordinator:
def __init__(self, nodes):
self.nodes = nodes
self.q = 1+len(nodes)/2
def add_node(self, node, timeout):
due = now() + timeout
while node not in self.nodes and due > now():
try:
if len(self.nodes)%2==1:
# set_q has the same effect as filter from the DPP article
def _read_write_read(self, test, val, due):
with self.mem.lock:
# makes a snapshot of the current state
era, nodes, q = self.mem.era, self.nodes, self.q
# increase counter of era's active _read_write_read ops
self.mem.ops+=1
n = self._get_n(0)
resps = net.send(nodes, lambda x: x.prepare(n), due-now())
try:
ok = resps.where(lambda x: isinstance(x.msg, Accepted)).wait(q)
def put(self, test, val, timeout):
due = now() + timeout
while due > now():
result = self._read_write_read(test, val, due)
if isinstance(result, NetworkError): continue
return result # OK or Conflict
return NetworkError()
def get(self, timeout):
return self.put(None, None, timeout)
def set_q(self, q):
Accepted = namedtuple('Accepted', ['n', 'val'])
class Variable:
def __init__(self, nodes, q):
self.q = q
self.nodes = nodes
self.promise = 0
self.accepted = Accepted(0,None)
self.mem = Mem() # we don't persist self.mem and keep it in memory
self.mem.era = 0 # current era
self.mem.ops = 0 # number of active _read_write_read started in current era