Created
January 9, 2018 10:18
-
-
Save dpapp-hortonworks/95e03a333ce7530009af5729c3446858 to your computer and use it in GitHub Desktop.
Stuff.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from itertools import ifilter | |
from collections import defaultdict as dd | |
class GEException(Exception) | |
class CircularDependencyException(GEException): | |
pass | |
class NodeNotReadyException(GEException): | |
pass | |
class NoValue(object): | |
pass | |
no_value = NoValue() | |
class Input(object): | |
pass | |
class SimpleInput(Input): | |
def __init__(self, value): | |
self._value = value | |
@property | |
def value(self): | |
return self._value | |
def __str__(self): | |
return "<SimpleInput value=%s >" % self._value | |
__repr__ = __str__ | |
class NodeOutput(Input): | |
def __init__(self, name): | |
self._name = name | |
@property | |
def name(self): | |
return self._name | |
def _set_node(self, node): | |
self._node = node | |
@property | |
def value(self): | |
return self._node._value | |
def __str__(self): | |
return "<NodeOutput value=%s >" % (self._node.value if hasattr(self, "_node") else None) | |
__repr__ = __str__ | |
class Node(object): | |
def __init__(self, name, f, **kwargs): | |
# Checking if the kwargs are well-formed | |
assert set(f.__code__.co_varnames) == set(kwargs.keys()) | |
for value in kwargs.itervalues(): | |
assert issubclass(value.__class__, Input) | |
self._f = f | |
self._kwargs = kwargs | |
self._name = name | |
self._value = no_value | |
@property | |
def ready(self): | |
return self._value == no_value | |
@property | |
def name(self): | |
return self._name | |
@property | |
def f(self): | |
return self._f | |
@property | |
def kwargs(self): | |
return self._kwargs | |
@property | |
def value(self): | |
return self._value | |
def execute(self): | |
self._value = self._f(**{ | |
key: inp.value for key, inp in self._kwargs.iteritems() | |
}) | |
def __str__(self): | |
return "<Node name=%s, value=%s>" % (self._name, self._value) | |
__repr__ = __str__ | |
class GraphExecutor(object): | |
def __init__(self, *nodes): | |
self._nodes = nodes | |
self._value = no_value | |
@property | |
def ready(self): | |
return self._value != no_value | |
@property | |
def value(self): | |
return self._value | |
def execute(self): | |
if self.ready: | |
return | |
name_to_node = {node.name: node for node in self._nodes} | |
forward, backward = dd(set), dd(set) | |
for node in self._nodes: | |
cname = node.name | |
for inp in node.kwargs.itervalues(): | |
if isinstance(inp, NodeOutput): | |
pname = inp.name | |
assert pname in name_to_node, "Referring to unknown node with name: %s" % pname | |
inp._set_node(name_to_node[pname]) | |
forward[pname].add(cname) | |
backward[cname].add(pname) | |
stack = [name for name in name_to_node.iterkeys() if not backward[name]] | |
sinks = [name for name in name_to_node.iterkeys() if not forward[name]] | |
assert len(sinks) == 1, "There must be exactly one sink node" | |
sink = sinks[0] | |
if not stack: | |
raise CircularDependencyException() | |
was = set(stack) | |
while stack: | |
nstack = [] | |
for elem in stack: | |
name_to_node[elem].execute() | |
for nxt in forward[elem]: | |
backward[nxt].remove(elem) | |
if not backward[nxt]: | |
if nxt in was: | |
raise CircularDependencyError() | |
else: | |
was.add(nxt) | |
nstack.append(nxt) | |
stack = nstack | |
self._value = name_to_node[sink].value | |
def f(a, b, c): | |
return a + b + c | |
def g(c): | |
return c * 2 | |
def h(d, e): | |
return d - e | |
if __name__ == "__main__": | |
job_graph = [ | |
Node("f", f, a=SimpleInput(12), b=SimpleInput(13), c=SimpleInput(14)), | |
Node("g", g, c=SimpleInput(15)), | |
Node("h", h, d=NodeOutput("f"), e=NodeOutput("g")) | |
] | |
ge = GraphExecutor(*job_graph) | |
ge.execute() | |
print ge.value |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment