Skip to content

Instantly share code, notes, and snippets.

@krsoninikhil
Created August 10, 2022 20:03
Show Gist options
  • Save krsoninikhil/80b532ae028b86e08f27ad01118b07b9 to your computer and use it in GitHub Desktop.
Save krsoninikhil/80b532ae028b86e08f27ad01118b07b9 to your computer and use it in GitHub Desktop.
import dag
def always_fail():
raise Exception('Manually throwing')
processes = {
"root": lambda: print("root"),
"a": lambda: print("A"),
"b": lambda: print("B"),
"c": lambda: print("C"),
#"c": always_fail,
"d": lambda: print("D"),
"e": lambda: print("E")
}
nodes = {
'root': ['a'],
'a': ['b', 'e'],
'b': ['c', 'd'],
'd': ['e'],
'c': [],
'e': []
}
class Executor:
def __init__(self, nodes):
self.dag = dag.DAG()
self.dag.from_dict(nodes)
self.nodes = nodes
self.executed = []
self.preds = []
def execute(self, curr_node='root'):
preds = self.dag.predecessors(curr_node, self.dag.graph)
for pred in preds:
if not pred in self.executed:
return
results = []
try:
processes[curr_node]()
self.executed.append(curr_node)
descendents = self.dag.downstream(curr_node, self.dag.graph)
for desc in descendents:
results.append(self.execute(desc))
return all(results)
except Exception as e:
print(e)
return False
return True
ex = Executor(nodes)
print(ex.execute())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment