Skip to content

Instantly share code, notes, and snippets.

@mpkocher
Created October 1, 2012 05:03
Show Gist options
  • Save mpkocher/3809546 to your computer and use it in GitHub Desktop.
Save mpkocher/3809546 to your computer and use it in GitHub Desktop.
Sketch with new task deco style
import os
import sys
import types
import logging
log = logging.getLogger(__name__)
def _setup_log():
log.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
def task(**kw):
"""
Simple deco to generate the task metadata
"""
# Explicitly grab the necessary keys
keys = 'inputs outputs errors task_type'.split()
task_meta_data = {k:v for k, v in kw.iteritems() if k in keys}
def f(task_func):
"""Return a dict of meta data"""
task_meta_data['task_func'] = task_func
# Since funcs can be pickled, keeping track of the func_name
# could be useful for serialization. Then the func lookup
# could be done via P*Module
task_meta_data['task_func_name'] = task_func.func_name
return task_meta_data
return f
class MyModule(object):
"""Simple P*module-esque class"""
def __init__(self):
self.name = self.__class__.__name__
self.registered_tasks = 'my_task new_style_task'.split()
def validate_settings(self):
return True
def __str__(self):
outs = list()
outs.append("Module Summary: {s}".format(s=self.__class__.__name__))
outs.append("{n} tasks".format(n=len(self.registered_tasks)))
for task_name in self.registered_tasks:
outs.append("task {n}".format(n=task_name))
return "\n".join(outs)
@task(inputs={'input_txt':'input.txt'},
outputs={'output_txt':'output.txt'},
errors={'AssertionError':'Found ERROR'},
task_type = 'SimpleLocalTask'
)
def my_task(self, files):
"""
Simple task modeled after the current style.
:param files
:return (list of str)
"""
log.debug("my_task called with files={f}".format(f=files))
cmds = list()
cmds.append("which grep")
cmds.append('which ruby')
cmds.append('perl --version')
return cmds
@task(inputs={'new_input_txt':'new_input.txt'},
outputs={'new_output_txt':'new_output.txt'},
errors={'AssertionError':'Found ERROR'},
task_type = 'SimpleLocalTask'
)
def new_style_task(self, new_input_txt=None, new_output_txt=None):
"""
New Style kwargs based defining of a task
"""
cmds = list()
cmds.append("echo \"Started_at `date`\" > {o}".format(o=new_output_txt))
cmds.append("cat {i} >> {o}".format(i=new_input_txt, o=new_output_txt))
cmds.append("echo \"Completed_at `date`\" >> {o}".format(o=new_output_txt))
return cmds
class LocalTask(object):
def __init__(self, inputs, outputs, dir_name, task_func, bind=False, errors=None):
"""
Task classes are metadata containers for Tasks.
They are NOT responsible for running them. The are run by a Worker.
:param inputs (dict) {name: SmrtFile}
:param outputs (dict) {name: SmrtFile}
:param dir_name (str) Dir name to write to
:param task_func (function) function (bound to self, or not bound to self)
:param bind (bool) flag to bind to function to self
:param errors (dict) {str_error: display_message}
"""
self.inputs = inputs
self.outputs = outputs
self.dir_name = dir_name
self.task_func = task_func
#To be similar to how the current code works
self.errors = errors
self.state = 'TaskInitialized'
# to make things a bit easier, we can dynamically bind the func to self
if bind:
# this assumes that the func was defined with (self, arg1, kw='')
# Now we can call self.task_fun(arg1, kw='')
# Furthermore, in the func definition you can use references to
# self. e.g., self.inputs['input_txt'].path
self.to_cmds = types.MethodType(self.task_func, self, LocalTask)
log.debug("Dynamically bound func {f} with self to to_cmds".format(f=self.task_func.func_name))
else:
# This assumes that the function was not bound to anything
# i.e., can be called my_func(arg1, kw='stuff')
self.to_cmds = task_func
log.debug("Created an instance of {c}".format(c=self.__class__.__name__))
@property
def name(self):
# This is the task name from the P_Module
return self.task_func.func_name
def to_cmds(self):
"""
This will be dynamically overwritten in __init__
"""
raise NotImplementedError
def test_01():
"""
Simple Example of running a task
:return: (bool)
"""
log.info("Starting test_01")
my_module = MyModule()
log.debug(my_module)
t = my_module.my_task
task_func = t['task_func']
log.debug(task_func.func_name)
cmds = task_func('this is necessary because of self', 'hello')
log.debug("\n".join(cmds))
log.info("Completed test_01")
def test_02():
"""
Basic Example of using a P*Module to pass data to a LocalTask
:return: (bool)
"""
log.info("Starting test_02")
log.debug("Running test {f}".format(f='test_02'))
my_module = MyModule()
log.debug(my_module)
# Task metadata
task_dict = my_module.my_task
task_func = task_dict['task_func']
log.debug(task_func.func_name)
cmds = task_func('this is necessary because of self', 'hello')
log.debug("printing cmds")
log.debug("\n".join(cmds))
bind = True
dir_name = os.getcwd()
inputs = task_dict['inputs']
outputs = task_dict['outputs']
errors = task_dict['errors']
task_func = task_dict['task_func']
t = LocalTask(inputs, outputs, dir_name, task_func, bind=bind, errors=errors)
log.debug(t)
#print t.to_cmds('this is necessary because of self', 'hello')
log.debug("Running task {n} in {d}".format(n=t.name, d=t.dir_name))
log.debug("Printing cmds")
log.debug(t.to_cmds('hello'))
log.info("Completed test_02")
def main():
_setup_log()
#test_01()
test_02()
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment