Skip to content

Instantly share code, notes, and snippets.

View rystsov's full-sized avatar

Denis Rystsov rystsov

View GitHub Profile
@rystsov
rystsov / portqsp.py
Created March 16, 2013 17:38
Задачка Умпутума (Parallel order respectful task queue streaming processing)
from Queue import Queue
from threading import Thread
from time import sleep
# main queues
tasks = Queue()
results = Queue()
# auxiliary queues & workers ####################################################
preresults = Queue()
<html>
<body>
<div id="canvas"></div>
<script type="text/javascript">
var canvas = document.getElementById("canvas");
var input = document.createElement("input");
input.type = "textbox";
input.value = "click here and then on address";
input.style.cssText += ";width:200px;"
input.addEventListener("blur", function(){
Accepted = namedtuple('Accepted', ['n', 'val'])
class Register:
def __init__(self, nodes, q):
self.q = q # quorum size
self.nodes = nodes # addresses of all nodes (including current)
self.promise = -1 # promise
self.accepted = None # accepted pair of a value and its ballot num
self.chosen = None # stores cached chosen value
@synchronized
def prepare(self, n):
Accepted = namedtuple('Accepted', ['n', 'val'])
class Variable:
def __init__(self, nodes, q):
self.q = q
self.nodes = nodes
self.promise = 0
self.accepted = Accepted(0,0,None)
@synchronized
def prepare(self, n):
if self.promise.n < n:
def put(self, test, val, timeout):
due = now() + timeout
while due > now():
result = self._read_write_read(test, val, due)
if isinstance(result, NetworkError): continue
return result # OK or Conflict
return NetworkError()
def get(self, timeout):
return self.put(None, None, timeout)
def _read_write_read(self, test, val, due):
n = self._get_n(0)
resps = net.send(self.nodes, lambda x: x.prepare(n), due-now())
try:
# Waiting for q (majority) succeeded responses.
ok = resps.where(lambda x: isinstance(x.msg, Accepted)).wait(self.q)
last = ok.max(lambda x: x.msg.n).msg.val
if test != None and test(last):
candidate = val
else:
def put(self, val, timeout):
due = now() + timeout
while due > now():
result = self._read_write_read(val, due)
if isinstance(result, NetworkError): continue
return result
return NetworkError()
def get(self, timeout):
ok = self.put(None, timeout)
if isinstance(ok, Conflict):
@synchronized
def _get_n(self, min):
# returns next ballot number which is greater then min
def _read_write_read(self, val, due):
if self.chosen != None:
if self.chosen == val: return OK(val=val)
return Conflict(val=self.chosen)
n = self._get_n(0)
# Phase I. Sending a prepare request to all nodes. Method
# returns a set of futures.
Accepted = namedtuple('Accepted', ['n', 'val'])
class Variable:
def __init__(self, nodes, q):
self.q = q
self.nodes = nodes
self.promise = 0
self.accepted = Accepted(0,None)
self.mem = Mem() # we don't persist self.mem and keep it in memory
self.mem.era = 0 # current era
self.mem.ops = 0 # number of active _read_write_read started in current era
def put(self, test, val, timeout):
due = now() + timeout
while due > now():
result = self._read_write_read(test, val, due)
if isinstance(result, NetworkError): continue
return result # OK or Conflict
return NetworkError()
def get(self, timeout):
return self.put(None, None, timeout)
def set_q(self, q):