Skip to content

Instantly share code, notes, and snippets.

@dpapp-hortonworks
Created January 9, 2018 10:18
Show Gist options
  • Save dpapp-hortonworks/95e03a333ce7530009af5729c3446858 to your computer and use it in GitHub Desktop.
Save dpapp-hortonworks/95e03a333ce7530009af5729c3446858 to your computer and use it in GitHub Desktop.
Stuff.
from itertools import ifilter
from collections import defaultdict as dd
class GEException(Exception)
class CircularDependencyException(GEException):
pass
class NodeNotReadyException(GEException):
pass
class NoValue(object):
pass
no_value = NoValue()
class Input(object):
pass
class SimpleInput(Input):
def __init__(self, value):
self._value = value
@property
def value(self):
return self._value
def __str__(self):
return "<SimpleInput value=%s >" % self._value
__repr__ = __str__
class NodeOutput(Input):
def __init__(self, name):
self._name = name
@property
def name(self):
return self._name
def _set_node(self, node):
self._node = node
@property
def value(self):
return self._node._value
def __str__(self):
return "<NodeOutput value=%s >" % (self._node.value if hasattr(self, "_node") else None)
__repr__ = __str__
class Node(object):
def __init__(self, name, f, **kwargs):
# Checking if the kwargs are well-formed
assert set(f.__code__.co_varnames) == set(kwargs.keys())
for value in kwargs.itervalues():
assert issubclass(value.__class__, Input)
self._f = f
self._kwargs = kwargs
self._name = name
self._value = no_value
@property
def ready(self):
return self._value == no_value
@property
def name(self):
return self._name
@property
def f(self):
return self._f
@property
def kwargs(self):
return self._kwargs
@property
def value(self):
return self._value
def execute(self):
self._value = self._f(**{
key: inp.value for key, inp in self._kwargs.iteritems()
})
def __str__(self):
return "<Node name=%s, value=%s>" % (self._name, self._value)
__repr__ = __str__
class GraphExecutor(object):
def __init__(self, *nodes):
self._nodes = nodes
self._value = no_value
@property
def ready(self):
return self._value != no_value
@property
def value(self):
return self._value
def execute(self):
if self.ready:
return
name_to_node = {node.name: node for node in self._nodes}
forward, backward = dd(set), dd(set)
for node in self._nodes:
cname = node.name
for inp in node.kwargs.itervalues():
if isinstance(inp, NodeOutput):
pname = inp.name
assert pname in name_to_node, "Referring to unknown node with name: %s" % pname
inp._set_node(name_to_node[pname])
forward[pname].add(cname)
backward[cname].add(pname)
stack = [name for name in name_to_node.iterkeys() if not backward[name]]
sinks = [name for name in name_to_node.iterkeys() if not forward[name]]
assert len(sinks) == 1, "There must be exactly one sink node"
sink = sinks[0]
if not stack:
raise CircularDependencyException()
was = set(stack)
while stack:
nstack = []
for elem in stack:
name_to_node[elem].execute()
for nxt in forward[elem]:
backward[nxt].remove(elem)
if not backward[nxt]:
if nxt in was:
raise CircularDependencyError()
else:
was.add(nxt)
nstack.append(nxt)
stack = nstack
self._value = name_to_node[sink].value
def f(a, b, c):
return a + b + c
def g(c):
return c * 2
def h(d, e):
return d - e
if __name__ == "__main__":
job_graph = [
Node("f", f, a=SimpleInput(12), b=SimpleInput(13), c=SimpleInput(14)),
Node("g", g, c=SimpleInput(15)),
Node("h", h, d=NodeOutput("f"), e=NodeOutput("g"))
]
ge = GraphExecutor(*job_graph)
ge.execute()
print ge.value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment