Skip to content

Instantly share code, notes, and snippets.

@dangunter
Created March 15, 2012 16:13
Show Gist options
  • Save dangunter/2045028 to your computer and use it in GitHub Desktop.
Save dangunter/2045028 to your computer and use it in GitHub Desktop.
task graph thoughts
"""
Task graph
==========
Some toy code to play with the aspects
of the task-graph problem. Note that this is
all in-memory, but really want to implement so it
can directly manipulate mongodb data.
Initial problem statement
--------------------------
D1 = data type 1
D2 = data type 2
F = function
Abstract task is simple sequence
T = D1 -> F -> D2 or F(D1):D2
In this case D2 is the "goal" of the task.
* Signal: perform T on crystal C
* Response:
1. create concrete task, label all functions as "wait". All parameters are empty.
d1[null] -> f()[wait] -> d2[null]
2. set d1 based on task input C
d1[c1] -> f()[wait] -> d2[null]
* Signal: all dependencies for f() (i.e. d1) have been satisfied (non-null)
* Response:
1. calculate resources r needed by f(d1=c1)
d1[c1] -> f()[state=wait, R=r1] -> d2[null]
2. set state of f() to queued
d1[c1] -> f()[state=q, R=r1] -> d2[null]
* Signal: queued f() matches query from worker w
* Response:
1. mark f() as "running", add worker-id for w
d1[c1] -> f()[state=run, R=r1,worker=w_id] -> d2[null]
* Signal: f() is finished, with output O1
* Response:
1. a) set d2 based on output O1
d1[c1] -> f()[state=run, R=r1,worker=w_id] -> d2[x]
b) mark f() as successfully run, update with run metadata
d1[c1] -> f()[state=ok, R=r1,worker=w_id, run_meta=m] -> d2[x]
* Signal: d2 has a value x
* Response:
1. optionally, check whether there exists identical functions (f(d1[c1]) in this case) for which the result is unknown; if so, substitute d2[x] for the result (esp. if they are not running yet).
"""
import networkx as nx
from itertools import *
## Base classes
class Status:
W = "wait"
Q = "queued"
R = "running"
X = "done"
E = "error"
class DataType(object):
def __init__(self):
self.params = None
def __str__(self):
return "{}[{}]".format(self.__class__.__name__, self.params)
class Empty(DataType):
pass
class Function(object):
def __init__(self):
self.status = Status.W
def __str__(self):
return "{}/{}()".format(self.status, self.__class__.__name__)
## Derived classes
class D1(DataType):
pass
class D2(DataType):
pass
class F(Function):
pass
## Stand-alone program
def build_graph(*edges):
nodes = set()
for e in edges:
map(nodes.add, e)
g = nx.DiGraph()
g.add_nodes_from(nodes)
g.add_edges_from(edges)
return g
def main(args):
for gr in run():
print("--")
print(gr.adj)
def is_function(obj):
return isinstance(obj, Function)
def run():
# abstract template
g = build_graph((D2, F),(F, D1))
yield g
# create concrete task
d1, f, d2 = D1(), F(), D2()
g = build_graph((d2,f), (f,d1))
yield g
# Get "waiting" function for which all its childrens' params are non-empty.
# XXX: untested, probably won't work
for waiting in ifilter(lambda x: is_function(x) and x.status == Status.W, g.nodes()):
num_empty = len(ifilter(lambda x: x.params is None, g.adj[waiting].iterkeys()))
if num_empty == 0:
waiting.status = Status.Q
yield g
if __name__ == "__main__":
import sys
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment