Created
October 1, 2012 05:03
-
-
Save mpkocher/3809546 to your computer and use it in GitHub Desktop.
Sketch with new task deco style
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import types | |
import logging | |
log = logging.getLogger(__name__) | |
def _setup_log(): | |
log.setLevel(logging.DEBUG) | |
handler = logging.StreamHandler(sys.stdout) | |
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') | |
handler.setFormatter(formatter) | |
log.addHandler(handler) | |
def task(**kw): | |
""" | |
Simple deco to generate the task metadata | |
""" | |
# Explicitly grab the necessary keys | |
keys = 'inputs outputs errors task_type'.split() | |
task_meta_data = {k:v for k, v in kw.iteritems() if k in keys} | |
def f(task_func): | |
"""Return a dict of meta data""" | |
task_meta_data['task_func'] = task_func | |
# Since funcs can be pickled, keeping track of the func_name | |
# could be useful for serialization. Then the func lookup | |
# could be done via P*Module | |
task_meta_data['task_func_name'] = task_func.func_name | |
return task_meta_data | |
return f | |
class MyModule(object): | |
"""Simple P*module-esque class""" | |
def __init__(self): | |
self.name = self.__class__.__name__ | |
self.registered_tasks = 'my_task new_style_task'.split() | |
def validate_settings(self): | |
return True | |
def __str__(self): | |
outs = list() | |
outs.append("Module Summary: {s}".format(s=self.__class__.__name__)) | |
outs.append("{n} tasks".format(n=len(self.registered_tasks))) | |
for task_name in self.registered_tasks: | |
outs.append("task {n}".format(n=task_name)) | |
return "\n".join(outs) | |
@task(inputs={'input_txt':'input.txt'}, | |
outputs={'output_txt':'output.txt'}, | |
errors={'AssertionError':'Found ERROR'}, | |
task_type = 'SimpleLocalTask' | |
) | |
def my_task(self, files): | |
""" | |
Simple task modeled after the current style. | |
:param files | |
:return (list of str) | |
""" | |
log.debug("my_task called with files={f}".format(f=files)) | |
cmds = list() | |
cmds.append("which grep") | |
cmds.append('which ruby') | |
cmds.append('perl --version') | |
return cmds | |
@task(inputs={'new_input_txt':'new_input.txt'}, | |
outputs={'new_output_txt':'new_output.txt'}, | |
errors={'AssertionError':'Found ERROR'}, | |
task_type = 'SimpleLocalTask' | |
) | |
def new_style_task(self, new_input_txt=None, new_output_txt=None): | |
""" | |
New Style kwargs based defining of a task | |
""" | |
cmds = list() | |
cmds.append("echo \"Started_at `date`\" > {o}".format(o=new_output_txt)) | |
cmds.append("cat {i} >> {o}".format(i=new_input_txt, o=new_output_txt)) | |
cmds.append("echo \"Completed_at `date`\" >> {o}".format(o=new_output_txt)) | |
return cmds | |
class LocalTask(object): | |
def __init__(self, inputs, outputs, dir_name, task_func, bind=False, errors=None): | |
""" | |
Task classes are metadata containers for Tasks. | |
They are NOT responsible for running them. The are run by a Worker. | |
:param inputs (dict) {name: SmrtFile} | |
:param outputs (dict) {name: SmrtFile} | |
:param dir_name (str) Dir name to write to | |
:param task_func (function) function (bound to self, or not bound to self) | |
:param bind (bool) flag to bind to function to self | |
:param errors (dict) {str_error: display_message} | |
""" | |
self.inputs = inputs | |
self.outputs = outputs | |
self.dir_name = dir_name | |
self.task_func = task_func | |
#To be similar to how the current code works | |
self.errors = errors | |
self.state = 'TaskInitialized' | |
# to make things a bit easier, we can dynamically bind the func to self | |
if bind: | |
# this assumes that the func was defined with (self, arg1, kw='') | |
# Now we can call self.task_fun(arg1, kw='') | |
# Furthermore, in the func definition you can use references to | |
# self. e.g., self.inputs['input_txt'].path | |
self.to_cmds = types.MethodType(self.task_func, self, LocalTask) | |
log.debug("Dynamically bound func {f} with self to to_cmds".format(f=self.task_func.func_name)) | |
else: | |
# This assumes that the function was not bound to anything | |
# i.e., can be called my_func(arg1, kw='stuff') | |
self.to_cmds = task_func | |
log.debug("Created an instance of {c}".format(c=self.__class__.__name__)) | |
@property | |
def name(self): | |
# This is the task name from the P_Module | |
return self.task_func.func_name | |
def to_cmds(self): | |
""" | |
This will be dynamically overwritten in __init__ | |
""" | |
raise NotImplementedError | |
def test_01(): | |
""" | |
Simple Example of running a task | |
:return: (bool) | |
""" | |
log.info("Starting test_01") | |
my_module = MyModule() | |
log.debug(my_module) | |
t = my_module.my_task | |
task_func = t['task_func'] | |
log.debug(task_func.func_name) | |
cmds = task_func('this is necessary because of self', 'hello') | |
log.debug("\n".join(cmds)) | |
log.info("Completed test_01") | |
def test_02(): | |
""" | |
Basic Example of using a P*Module to pass data to a LocalTask | |
:return: (bool) | |
""" | |
log.info("Starting test_02") | |
log.debug("Running test {f}".format(f='test_02')) | |
my_module = MyModule() | |
log.debug(my_module) | |
# Task metadata | |
task_dict = my_module.my_task | |
task_func = task_dict['task_func'] | |
log.debug(task_func.func_name) | |
cmds = task_func('this is necessary because of self', 'hello') | |
log.debug("printing cmds") | |
log.debug("\n".join(cmds)) | |
bind = True | |
dir_name = os.getcwd() | |
inputs = task_dict['inputs'] | |
outputs = task_dict['outputs'] | |
errors = task_dict['errors'] | |
task_func = task_dict['task_func'] | |
t = LocalTask(inputs, outputs, dir_name, task_func, bind=bind, errors=errors) | |
log.debug(t) | |
#print t.to_cmds('this is necessary because of self', 'hello') | |
log.debug("Running task {n} in {d}".format(n=t.name, d=t.dir_name)) | |
log.debug("Printing cmds") | |
log.debug(t.to_cmds('hello')) | |
log.info("Completed test_02") | |
def main(): | |
_setup_log() | |
#test_01() | |
test_02() | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment