Skip to content

Instantly share code, notes, and snippets.

@zemlanin
Created December 7, 2012 15:47
Show Gist options
  • Save zemlanin/4234085 to your computer and use it in GitHub Desktop.
Save zemlanin/4234085 to your computer and use it in GitHub Desktop.
ØMQ Queue model
import zmq, os
times = {}
reaction = {}
done = {}
number_of_tasks = 1.0
mean = lambda l: sum(l)/len(l)
def actuality(t):
if t <= 2.0:
return 1.0
elif t <= 6.0:
return 1.0-(t-2)/4
else:
return 0.0
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("ipc:///tmp/taskstats")
while True:
stat_type, worker, data = socket.recv().split(":")
if "comp_time" == stat_type:
try:
times[worker].append(float(data))
except KeyError:
times[worker] = [float(data)]
elif "reaction" == stat_type:
try:
reaction[worker].append(float(data))
except KeyError:
reaction[worker] = [float(data)]
elif "finished" == stat_type:
try:
done[worker] += 1
except KeyError:
done[worker] = 1.0
elif "gen" == stat_type:
number_of_tasks = float(data)
os.system("clear")
for key in reaction:
print key
try:
x_1 = mean(times[key])
x_2 = mean([x**2 for x in times[key]]) - mean(times[key])**2
x_3 = mean(reaction[key])
x_4 = done[key]/number_of_tasks
x_5 = mean([actuality(x) for x in times[key]])
print "\tx_1", x_1
print "\tx_2", x_2
print "\tx_3", x_3
print "\tx_4", x_4
print "\tx_5", x_5
print "\tf ", -1*x_1 - 2*x_2 - 5*x_3 + 2*x_4 + 8*x_5
except KeyError:
pass
import zmq, time, sys
from numpy.random import poisson
from random import randint
out_time, task_time = int(sys.argv[1]), int(sys.argv[2])
i = 0
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("ipc:///tmp/taskgen")
stats_socket = context.socket(zmq.PUSH)
stats_socket.connect("ipc:///tmp/taskstats")
while True:
sleepy_time = 0.1*poisson(out_time)
msg = str((i, round(0.1*poisson(task_time), 1), randint(0, 6)))
i += 1
time.sleep(sleepy_time)
print msg, "->"
socket.send(msg)
stats_socket.send("gen::"+str(i))
import zmq, time, os
line = []
def actuality(t):
if t <= 2.0:
return 1.0
elif t <= 6.0:
return 1.0-(t-2)/4
else:
return 0.0
context = zmq.Context()
taskgen_socket = context.socket(zmq.SUB)
taskgen_socket.connect("ipc:///tmp/taskgen")
taskgen_socket.setsockopt(zmq.SUBSCRIBE, "")
stats_socket = context.socket(zmq.PUSH)
stats_socket.connect("ipc:///tmp/taskstats")
while True:
while 0 != taskgen_socket.poll(0):
incoming = eval(taskgen_socket.recv())
line.append( (incoming[:2], time.time()) )
if line:
os.system("clear")
print line
task, received = line.pop()
stats_socket.send("reaction:LIFO:"+str(time.time()-received))
print "->", task, actuality(time.time()-received)
if actuality(time.time()-received) > 0.0:
time.sleep(task[1])
stats_socket.send("finished:LIFO:")
stats_socket.send("comp_time:LIFO:"+str(time.time()-received))
import zmq, time, os
lines = [[], [], [], [], [], [], []]
cycle_time = 0.2
known_tasks = []
def actuality(t):
if t <= 2.0:
return 1.0
elif t <= 6.0:
return 1.0-(t-2)/4
else:
return 0.0
def compute(task, received):
if task[0] not in known_tasks:
stats_socket.send("reaction:PER:"+str(time.time()-received))
known_tasks.append(task[0])
print "->", task, actuality(time.time()-received)
if actuality(time.time()-received) > 0.0:
time.sleep(min(comp_time, task[1]))
remained = round(task[1]-comp_time, 1)
if remained > 0:
task = (task[0], remained)
return task, received
stats_socket.send("finished:PER:")
stats_socket.send("comp_time:PER:"+str(time.time()-received))
return
context = zmq.Context()
taskgen_socket = context.socket(zmq.SUB)
taskgen_socket.connect("ipc:///tmp/taskgen")
taskgen_socket.setsockopt(zmq.SUBSCRIBE, "")
stats_socket = context.socket(zmq.PUSH)
stats_socket.connect("ipc:///tmp/taskstats")
while True:
while 0 != taskgen_socket.poll(0):
incoming = eval(taskgen_socket.recv())
lines[incoming[2]].append( (incoming[:2], time.time()) )
os.system("clear")
for line in lines:
print lines.index(line), line
while lines[0]:
task = compute(*lines[0].pop(0))
if task:
lines[0].append(task)
for line in lines[1:]:
if line:
task = compute(*line.pop(0))
if task:
lines[min(lines.index(line)+1, len(lines)-1)].append(task)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment