Skip to content

Instantly share code, notes, and snippets.

@rystsov
Last active September 8, 2015 00:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rystsov/d2fc020e28162947d450 to your computer and use it in GitHub Desktop.
Save rystsov/d2fc020e28162947d450 to your computer and use it in GitHub Desktop.
def _read_write_read(self, test, val, due):
with self.mem.lock:
# makes a snapshot of the current state
era, nodes, q = self.mem.era, self.nodes, self.q
# increase counter of era's active _read_write_read ops
self.mem.ops+=1
n = self._get_n(0)
resps = net.send(nodes, lambda x: x.prepare(n), due-now())
try:
ok = resps.where(lambda x: isinstance(x.msg, Accepted)).wait(q)
last = ok.max(lambda x: x.msg.n).msg.val
if test != None and test(last):
candidate = val
else:
candidate = last
acks = net.send(ok.nodes, lambda x: x.accept(n, candidate), due-now())
resps += acks
acks.where(lambda x: isinstance(x.msg, OK)).wait(q)
if test != None and not test(last): return Conflict(val=last)
return OK(val=candidate)
except CantResolveWait:
return NetworkError()
finally:
with self.mem.lock:
# decrease counter of era's active _read_write_read ops
if era == self.era:
self.mem.ops-=1
else:
self.mem.old_ops-=1
self.mem.lock.notify_all()
resps = resps.abort()
for x in resps.where(lambda x: isinstance(x.msg, Conflict)):
self._get_n(x.msg.promise)
# makes an idempotent atomic change and blocks until all
# _read_write_read ops started in previous era are finished
def _epic_change(self, test, change):
with self.mem.lock:
if self.mem.old_ops>0: return Busy()
if test(self): return OK()
# If the change wasn't applied yet then:
change(self) # make a change
self.mem.era+=1 # switch era
self.mem.old_ops += self.mem.ops
self.mem.ops = 0
# wait until all ops started in previous era are finished
while self.mem.old_ops>0:
self.mem.lock.wait()
return OK()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment