Skip to content

Instantly share code, notes, and snippets.

@beratdogan
Last active August 19, 2020 07:38
Show Gist options
  • Save beratdogan/c31b2f74f40672b46f5e485eb2075688 to your computer and use it in GitHub Desktop.
Save beratdogan/c31b2f74f40672b46f5e485eb2075688 to your computer and use it in GitHub Desktop.
import contextvars
from pprint import pprint as pp
__pipeline = contextvars.ContextVar("pipeline")
class StartingStepAlreadySet(Exception):
pass
class StartingStepNotSet(Exception):
pass
class OutOfPipelineError(Exception):
pass
def getpipeline():
try:
return __pipeline.get()
except LookupError as err:
raise OutOfPipelineError() from err
def setpipeline(pipeline):
__pipeline.set(pipeline)
class Pipeline:
def __init__(self):
self.starting_step = None
self.steps = []
self.edges = []
def __enter__(self):
setpipeline(self)
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
return # to hold exteption
self.render()
setpipeline(None)
def __rshift__(self, other):
self.set_starting_step(other)
def add_step(self, step):
self.steps.append(step)
def add_edge(self, edge):
self.edges.append(edge)
def set_starting_step(self, step):
if self.starting_step:
raise StartingStepAlreadySet()
self.starting_step = step
def render(self):
if not self.starting_step:
raise StartingStepNotSet()
pp(self.starting_step)
pp(self.steps)
pp(self.edges)
while self.edges:
edge = self.edges.pop()
class Step:
def __init__(self, name, conditions=None):
self.name = name
self.conditions = conditions
self.pipeline = getpipeline()
self.pipeline.add_step(self)
def __repr__(self):
return f"{self.name} ({self.conditions})"
def __rshift__(self, other):
return self.next(other)
def __pos__(self):
self.start_here()
def start_here(self):
self.pipeline.set_starting_step(self)
def when(self, **cond):
# copy current step with new conditions
# MAYBE: this adds another step to pipeline, do we want this?
return self.__class__(name=self.name, conditions=cond)
def next(self, other):
if not isinstance(other, list):
self.pipeline.add_edge(Edge(self, other))
return
# handle parallel runs
for steps in other:
self.pipeline.add_edge(Edge(self, steps, parallel=True))
class Edge:
def __init__(self, from_, to, parallel=False):
self.from_ = from_
self.to = to
self.parallel = parallel
def __repr__(self):
return f"{self.from_} > {self.to} {self.parallel}"
with Pipeline():
# Declare steps
a = Step(name='a')
b = Step(name='b')
b1 = Step(name='b1')
b2 = Step(name='b2')
c = Step(name='c')
success = Step(name='success')
fail = Step(name='fail')
# Connect steps to each other
a.when(status=False).next(fail) # explicit way
a.when(status=True) >> b # operators-also supported
b.next([b1, b2]) # parallel run
b1.next(c)
b2.next(c)
c >> success
c >> fail
# Set a as starting step
a.start_here()
@beratdogan
Copy link
Author

repr:

a (None)
[a (None),
 b (None),
 b1 (None),
 b2 (None),
 success (None),
 fail (None),
 a ({'status': True}),
 a ({'status': False})]
[a ({'status': True}) > fail (None) False,
 a ({'status': False}) > b (None) False,
 b (None) > b1 (None) True,
 b (None) > b2 (None) True]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment