Skip to content

Instantly share code, notes, and snippets.

@rmorshea
Created July 26, 2017 19:58
Show Gist options
  • Save rmorshea/f4a5e58e81a676021d7050ac17b2f2e2 to your computer and use it in GitHub Desktop.
Save rmorshea/f4a5e58e81a676021d7050ac17b2f2e2 to your computer and use it in GitHub Desktop.
A single file module for creating staged actions.
import six
import inspect
class MetaEngine(type):
def __init__(cls, name, bases, classdict):
cls._cycle = []
rotation = cls.next_rotation(None)
while rotation is not None:
cls._cycle.append(rotation)
rotation = cls.next_rotation(rotation)
def next_rotation(cls, rotation):
for c in cls.mro():
blueprint = vars(c).get("blueprint")
if blueprint is not None and rotation in blueprint:
return blueprint[rotation]
else:
return None
def __get__(cls, obj, obj_cls):
if obj is not None:
return cls(obj)
else:
return cls
class Engine(six.with_metaclass(MetaEngine, object)):
blueprint = {None: None}
def __init__(self, *args):
self._common_args = args
@property
def cycle(self):
return self._cycle[:]
def send(self, *args):
self._args = args + self._args
def receive(self):
args, self._args = self._args, self._common_args
return args
def __call__(self, *args, **kwargs):
self.send(*args)
if self.status is None:
for status in self.cycle:
self.status = status
if hasattr(self, status):
yield getattr(self, status)(*self.receive(), **kwargs)
self.status = None
self.receive()
else:
raise RuntimeError("Engine is already running (status: %r)." % self.status)
def crank(self, *args, **kwargs):
generator = self(*args, **kwargs)
def turn(*args):
self.send(*args)
return next(generator)
next(generator)
return turn
def between(after, before):
frame = inspect.currentframe().f_back
frame.f_locals.setdefault("blueprint", {})
blueprint = frame.f_locals["blueprint"]
def setup(method):
name = method.__name__
blueprint[after] = method.__name__
blueprint[method.__name__] = before
return method
return setup
def after(name):
frame = inspect.currentframe().f_back
frame.f_locals.setdefault("blueprint", {})
blueprint = frame.f_locals["blueprint"]
def setup(method):
blueprint[name] = method.__name__
return method
return setup
def before(name):
frame = inspect.currentframe().f_back
frame.f_locals.setdefault("blueprint", {})
blueprint = frame.f_locals["blueprint"]
def setup(method):
blueprint[method.__name__] = name
return method
return setup
def _after(blueprint, name, method):
blueprint[name] = method
def _before(blueprint, name, method):
blueprint[method] = name
class Action(Engine):
"""A basic Engine with 3 phases"""
@after(None)
def prep(self, *args, **kwargs):
pass
@after("prep")
def working(self, *args, **kwargs):
pass
@after("working")
def done(self, *args, **kwargs):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment