Created
March 15, 2012 16:13
-
-
Save dangunter/2045028 to your computer and use it in GitHub Desktop.
task graph thoughts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Task graph | |
========== | |
Some toy code to play with the aspects | |
of the task-graph problem. Note that this is | |
all in-memory, but really want to implement so it | |
can directly manipulate mongodb data. | |
Initial problem statement | |
-------------------------- | |
D1 = data type 1 | |
D2 = data type 2 | |
F = function | |
Abstract task is simple sequence | |
T = D1 -> F -> D2 or F(D1):D2 | |
In this case D2 is the "goal" of the task. | |
* Signal: perform T on crystal C | |
* Response: | |
1. create concrete task, label all functions as "wait". All parameters are empty. | |
d1[null] -> f()[wait] -> d2[null] | |
2. set d1 based on task input C | |
d1[c1] -> f()[wait] -> d2[null] | |
* Signal: all dependencies for f() (i.e. d1) have been satisfied (non-null) | |
* Response: | |
1. calculate resources r needed by f(d1=c1) | |
d1[c1] -> f()[state=wait, R=r1] -> d2[null] | |
2. set state of f() to queued | |
d1[c1] -> f()[state=q, R=r1] -> d2[null] | |
* Signal: queued f() matches query from worker w | |
* Response: | |
1. mark f() as "running", add worker-id for w | |
d1[c1] -> f()[state=run, R=r1,worker=w_id] -> d2[null] | |
* Signal: f() is finished, with output O1 | |
* Response: | |
1. a) set d2 based on output O1 | |
d1[c1] -> f()[state=run, R=r1,worker=w_id] -> d2[x] | |
b) mark f() as successfully run, update with run metadata | |
d1[c1] -> f()[state=ok, R=r1,worker=w_id, run_meta=m] -> d2[x] | |
* Signal: d2 has a value x | |
* Response: | |
1. optionally, check whether there exists identical functions (f(d1[c1]) in this case) for which the result is unknown; if so, substitute d2[x] for the result (esp. if they are not running yet). | |
""" | |
import networkx as nx | |
from itertools import * | |
## Base classes | |
class Status: | |
W = "wait" | |
Q = "queued" | |
R = "running" | |
X = "done" | |
E = "error" | |
class DataType(object): | |
def __init__(self): | |
self.params = None | |
def __str__(self): | |
return "{}[{}]".format(self.__class__.__name__, self.params) | |
class Empty(DataType): | |
pass | |
class Function(object): | |
def __init__(self): | |
self.status = Status.W | |
def __str__(self): | |
return "{}/{}()".format(self.status, self.__class__.__name__) | |
## Derived classes | |
class D1(DataType): | |
pass | |
class D2(DataType): | |
pass | |
class F(Function): | |
pass | |
## Stand-alone program | |
def build_graph(*edges): | |
nodes = set() | |
for e in edges: | |
map(nodes.add, e) | |
g = nx.DiGraph() | |
g.add_nodes_from(nodes) | |
g.add_edges_from(edges) | |
return g | |
def main(args): | |
for gr in run(): | |
print("--") | |
print(gr.adj) | |
def is_function(obj): | |
return isinstance(obj, Function) | |
def run(): | |
# abstract template | |
g = build_graph((D2, F),(F, D1)) | |
yield g | |
# create concrete task | |
d1, f, d2 = D1(), F(), D2() | |
g = build_graph((d2,f), (f,d1)) | |
yield g | |
# Get "waiting" function for which all its childrens' params are non-empty. | |
# XXX: untested, probably won't work | |
for waiting in ifilter(lambda x: is_function(x) and x.status == Status.W, g.nodes()): | |
num_empty = len(ifilter(lambda x: x.params is None, g.adj[waiting].iterkeys())) | |
if num_empty == 0: | |
waiting.status = Status.Q | |
yield g | |
if __name__ == "__main__": | |
import sys | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment