Created
July 26, 2017 19:58
-
-
Save rmorshea/f4a5e58e81a676021d7050ac17b2f2e2 to your computer and use it in GitHub Desktop.
A single file module for creating staged actions.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import six | |
import inspect | |
class MetaEngine(type): | |
def __init__(cls, name, bases, classdict): | |
cls._cycle = [] | |
rotation = cls.next_rotation(None) | |
while rotation is not None: | |
cls._cycle.append(rotation) | |
rotation = cls.next_rotation(rotation) | |
def next_rotation(cls, rotation): | |
for c in cls.mro(): | |
blueprint = vars(c).get("blueprint") | |
if blueprint is not None and rotation in blueprint: | |
return blueprint[rotation] | |
else: | |
return None | |
def __get__(cls, obj, obj_cls): | |
if obj is not None: | |
return cls(obj) | |
else: | |
return cls | |
class Engine(six.with_metaclass(MetaEngine, object)): | |
blueprint = {None: None} | |
def __init__(self, *args): | |
self._common_args = args | |
@property | |
def cycle(self): | |
return self._cycle[:] | |
def send(self, *args): | |
self._args = args + self._args | |
def receive(self): | |
args, self._args = self._args, self._common_args | |
return args | |
def __call__(self, *args, **kwargs): | |
self.send(*args) | |
if self.status is None: | |
for status in self.cycle: | |
self.status = status | |
if hasattr(self, status): | |
yield getattr(self, status)(*self.receive(), **kwargs) | |
self.status = None | |
self.receive() | |
else: | |
raise RuntimeError("Engine is already running (status: %r)." % self.status) | |
def crank(self, *args, **kwargs): | |
generator = self(*args, **kwargs) | |
def turn(*args): | |
self.send(*args) | |
return next(generator) | |
next(generator) | |
return turn | |
def between(after, before): | |
frame = inspect.currentframe().f_back | |
frame.f_locals.setdefault("blueprint", {}) | |
blueprint = frame.f_locals["blueprint"] | |
def setup(method): | |
name = method.__name__ | |
blueprint[after] = method.__name__ | |
blueprint[method.__name__] = before | |
return method | |
return setup | |
def after(name): | |
frame = inspect.currentframe().f_back | |
frame.f_locals.setdefault("blueprint", {}) | |
blueprint = frame.f_locals["blueprint"] | |
def setup(method): | |
blueprint[name] = method.__name__ | |
return method | |
return setup | |
def before(name): | |
frame = inspect.currentframe().f_back | |
frame.f_locals.setdefault("blueprint", {}) | |
blueprint = frame.f_locals["blueprint"] | |
def setup(method): | |
blueprint[method.__name__] = name | |
return method | |
return setup | |
def _after(blueprint, name, method): | |
blueprint[name] = method | |
def _before(blueprint, name, method): | |
blueprint[method] = name | |
class Action(Engine): | |
"""A basic Engine with 3 phases""" | |
@after(None) | |
def prep(self, *args, **kwargs): | |
pass | |
@after("prep") | |
def working(self, *args, **kwargs): | |
pass | |
@after("working") | |
def done(self, *args, **kwargs): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment