Created
September 27, 2015 18:35
-
-
Save gdugas/a1d894c317496e37eddd to your computer and use it in GitHub Desktop.
Task Aggregator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse, copy, json, logging, os, sys | |
class TaggModule: | |
def __init__(self, path): | |
self.path = path | |
self.name = None | |
self.version = None | |
self.command = {} | |
self.hooks = {} | |
self._init_module() | |
def _init_module(self): | |
metas = json.load(open("%s/tagg.json" % self.path)) | |
self.name = self.path.split('/')[-1].lower() | |
self.version = metas.pop('version') | |
self._parse_commands() | |
self._init_hooks() | |
def _parse_commands(self): | |
defaults = { | |
'arguments': [], | |
'subcommands': {}, | |
'description': "" | |
} | |
command = copy.deepcopy(defaults) | |
if os.path.exists("%s/command.json" % self.path): | |
command['arguments'] = json.load(open("%s/command.json" % self.path)) | |
scanpath = "%s/subcommands" % (self.path) | |
if os.path.exists(scanpath): | |
for file in os.listdir(scanpath): | |
if not file.startswith('.'): | |
args = json.load(open("%s/%s" % (scanpath, file))) | |
subs = file.split('.')[:-1] | |
cmd = command | |
for sub in subs: | |
if not sub in cmd['subcommands']: | |
cmd['subcommands'][sub] = copy.deepcopy(defaults) | |
cmd = cmd['subcommands'][sub] | |
cmd['arguments'] = args | |
self.command = command | |
def _init_hooks(self): | |
scanpath = "%s/hooks" % (self.path) | |
if os.path.exists(scanpath): | |
for file in os.listdir(scanpath): | |
if not file.startswith('.'): | |
self.hooks[file] = "%s/%s" % (scanpath, file) | |
def provide_command(self): | |
return os.path.exists("%s/command.json" % self.path) or len(self.command['subcommands']) > 0 | |
def validate(self): | |
for hook in self.hooks: | |
if not os.path.exists(self.hooks[hook]): | |
raise Exception("...") | |
def execute(self, namespace): | |
pass | |
class TaggApp: | |
ARG_TYPES = { | |
'str' : str, | |
'int' : int, | |
'float' : float, | |
'bool': bool | |
} | |
ALLOWED_ARGS = ['nargs', 'default', 'type', 'choices', 'required', 'help', 'metavar', 'action'] | |
def __init__(self, module_paths=['./tagg_modules/']): | |
self.module_paths = module_paths | |
self.modules = [] | |
def execute(self, args): | |
hooks = [module.hooks for module in self.modules] | |
try: | |
for evt in ['pre_exec', 'exec', 'post_exec']: | |
for hook in hooks: | |
evt = "%s.%s" % (args.launched, evt) | |
if evt in hook and os.path.exists(hook[evt]) and os.access(hook[evt], os.X_OK): | |
if os.system(hook[evt]) > 0: | |
raise Exception("%s error on hook %s" % (evt, hook[evt])) | |
except: | |
for hook in hooks: | |
on_error = "%s.on_error" % args.launched | |
if on_error in hook and os.path.exists(hook[on_error]) and os.access(hook[on_error], os.X_OK): | |
os.system(hook[on_error]) | |
def execute_no_action(self, args): | |
args.parser.error("missing argument") | |
def configure_parser(self, parser, command, stack=[]): | |
parser.set_defaults(parser=parser) | |
parser.set_defaults(launched = '.'.join(stack)) | |
for args in command['arguments']: | |
kwargs = {} | |
if len(args) > 0: | |
if type(args[-1]) == dict: | |
kwargs = args.pop() | |
keys = [key for key in kwargs.keys()] | |
for key in keys: | |
if not key in self.ALLOWED_ARGS: | |
del kwargs[key] | |
if 'type' in kwargs: | |
kwargs['type'] = self.ARG_TYPES[kwargs['type']] | |
parser.add_argument(*args, **kwargs) | |
if len(command['subcommands']) == 0: | |
parser.set_defaults(callback=self.execute) | |
else: | |
subparsers = parser.add_subparsers(help=command['description']) | |
for cmd_name in command['subcommands']: | |
cmd_parser = subparsers.add_parser(cmd_name, help=command['subcommands'][cmd_name]['description']) | |
self.configure_parser(cmd_parser, command['subcommands'][cmd_name], stack=stack+[cmd_name]) | |
def discover_modules(self): | |
tagg_paths = [] | |
for path in self.module_paths: | |
path = os.path.abspath(path) | |
if not path in tagg_paths: | |
tagg_paths.append(path) | |
logging.debug("discovering modules ...") | |
modules = [] | |
for path in tagg_paths: | |
logging.debug("scanning modules in %s" % path) | |
if not os.path.exists(path): | |
continue | |
for module in os.listdir(path): | |
if not module.startswith('.'): | |
logging.debug("module discovered : %s" % module) | |
modules.append(TaggModule("%s/%s" % (path, module))) | |
self.modules = modules | |
def run(self): | |
parser = argparse.ArgumentParser() | |
parser.set_defaults(callback=self.execute_no_action) | |
if '-d' in sys.argv or '--debug' in sys.argv: | |
logging.basicConfig(level=logging.DEBUG) | |
command = { | |
'arguments' : [ | |
['-d', '--debug', { | |
'action' : 'store_true' | |
}] | |
], | |
'description' : "", | |
'subcommands' : {} | |
} | |
self.discover_modules() | |
for module in self.modules: | |
if module.provide_command(): | |
command['subcommands'][module.name] = module.command | |
self.configure_parser(parser, command) | |
args = parser.parse_args() | |
args.callback(args) | |
if __name__ == '__main__': | |
app = TaggApp().run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment