Skip to content

Instantly share code, notes, and snippets.

@gchiam
Last active September 30, 2015 01:47
Show Gist options
  • Save gchiam/9a8e25cc0679d1ad8e8c to your computer and use it in GitHub Desktop.
Save gchiam/9a8e25cc0679d1ad8e8c to your computer and use it in GitHub Desktop.
Simple Workflow Engine
import functools
import logging
class SimpleWorkflowEngine(object):
"""A simple workflow engine
Usage:
from functools import partial
def initialize(number):
return range(number)
def square(numbers):
return [x * x for x in numbers]
def multiply(numbers, multiplier=1):
return [x * multiplier for x in numbers]
workflow = [
initialize,
square,
partial(multiply, multiplier=2),
sum
]
engine = SimpleWorkflowEngine(workflow, initial=6)
return engine.run()
"""
def __init__(self, workflow, initial=None):
self.initial = initial
self.workflow = workflow
self.logger = logging.getLogger(__name__)
def run(self):
"""execute the workflow"""
return reduce(
lambda data, func: self._task_wrapper(func)(data),
[self.initial] + self.workflow
)
def _task_wrapper(self, func):
def get_func_name():
func_name = str(func)
try:
func_name = func.func_name
except AttributeError:
try:
func_name = func.__name__
except AttributeError:
if isinstance(func, functools.partial):
func_name = func.func.func_name
return str(func_name)
def format_for_display(arg, max_len=80):
display = str(arg)
if len(display) > max_len:
display = display[:max_len] + '...'
return display
def func_wrapper(*args, **kwargs):
func_name = get_func_name()
self.logger.debug(
'executing [%s], args[%s], kwargs[%s]',
func_name,
format_for_display(args),
format_for_display(kwargs),
)
results = func(*args, **kwargs)
self.logger.debug(
'executed [%s], results[%s]',
func_name,
format_for_display(results),
)
return results
return func_wrapper
def test():
from functools import partial
def initialize(number):
return range(number)
def square(numbers):
return [x * x for x in numbers]
def multiply(numbers, multiplier=1):
return [x * multiplier for x in numbers]
workflow = [
initialize,
square,
partial(multiply, multiplier=2),
sum
]
engine = SimpleWorkflowEngine(workflow, initial=6)
print engine.run()
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment