Last active
August 29, 2015 13:56
-
-
Save abg/8904431 to your computer and use it in GitHub Desktop.
simple command line helper - inspired by baker.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
cli | |
~~~ | |
Simple CLI API | |
@cli.command | |
def mycommand(required_arg, option='default', *varargs): | |
... | |
sys.exit(cli.run()) | |
""" | |
from __future__ import unicode_literals, print_function | |
import collections | |
import inspect | |
import itertools | |
import os | |
import re | |
import sys | |
import textwrap | |
# pylint: disable=R0903 | |
MISSING = object() | |
class CLIError(Exception): | |
"""Error occurred during CLI processing""" | |
def __init__(self, message="", cmd=None, status=os.EX_OK): | |
super(CLIError, self).__init__(message) | |
self.message = message | |
self.cmd = cmd | |
self.status = status | |
class CLIVersion(Exception): | |
"""Raised if --version option is specified""" | |
OPTION_RE = re.compile(r''' | |
^([ \t]*) | |
[:]option | |
\s+(?P<dest>[a-zA-Z_][a-zA-Z0-9_]*)[:] | |
\s*(?P<options>(?:.*?)) | |
(?:[ \t]+(?P<metavar>[^ \t]+))?\s*$ | |
''', | |
re.X | re.M) | |
PARAM_RE = re.compile(r''' | |
^([ \t]*) | |
[:]param | |
(?:\s+(?P<type>[a-zA-Z_][a-zA-Z0-9_]*))? | |
\s+(?P<dest>[a-zA-Z_][a-zA-Z0-9_]*)[:] | |
[ \t]*(?P<description>.*?(?:^(?:^\1\s+.*?^)*|\Z)) | |
''', | |
re.X | re.S | re.M) | |
def normalize_whitespace(value): | |
"""Replace consecutive whitespace in a string with a single space | |
:param value: string to normalize | |
:returns: normalized string | |
""" | |
return re.sub(r'\s+', ' ', value).strip() | |
def normalize_paragraphs(docstring): | |
"""Cleanup a description from a docstring | |
Remove :option: and :param: field lists from a docstring and ensures only a | |
single line break between paragraphs. | |
:param docstring: docstring to normalize | |
:returns: normalized string | |
""" | |
value = OPTION_RE.sub('', docstring) | |
value = PARAM_RE.sub('', value) | |
value = re.sub(r'(\S)\s(\S)', r'\1 \2', value) | |
value = "\n".join(line | |
for para in re.split(r'\n\s*\n', value) | |
for line in itertools.chain(wrap(para), [''])) | |
return value.strip() | |
def wrap(value, indent=0, width=79): | |
"""Wrap a string value via textwrap.wrap | |
:param value: string value to wrap | |
:param int indent: number of spaces to indent subsequent lines | |
:param int width: number of columns to wrap to | |
:returns: wrapped string value | |
""" | |
sindent = ' '*indent | |
for line in textwrap.wrap(value, subsequent_indent=sindent, width=width): | |
yield line | |
class Param(collections.namedtuple('Param', 'dest type description metavar')): | |
"""Represents a function parameter | |
:attr dest: parameter destination name | |
:attr type: parameter type object | |
:attr description: description of the parameter | |
""" | |
@property | |
def requires_arg(self): | |
"""Determine if this parameter requires an argument | |
:returns: True if the type is not a boolean, False otherwise | |
""" | |
return self.type is not bool | |
class Option(collections.namedtuple('Option', 'param shortopt longopt')): | |
"""Represent a single command option | |
:attr param: ``Param`` instance the option maps to | |
:attr shortopt: short -o option for this option | |
:attr longopt: long --option for this option | |
""" | |
def format(self): | |
"""Format this instance as a human readable string""" | |
tag = '[+]' if self.param.type is list else '' | |
shortopt = self.shortopt or '' | |
longopt = self.longopt or '' | |
metavar = self.param.metavar or '' | |
return '%-3s %-2s %-s%s' % (tag, shortopt, longopt, metavar) | |
def discover_params(docstring, argspec): | |
"""Discover :param: descriptors from a docstring | |
:yields: arg, Param tuples | |
""" | |
seen = set() | |
for match in PARAM_RE.finditer(docstring): | |
_type, dest, description = match.group('type', 'dest', 'description') | |
_type = _type and __builtins__[_type] or str | |
if _type is None and dest in argspec.defaults: | |
_type = type(argspec.defaults[dest]) | |
yield dest, Param(dest=dest, | |
type=_type, | |
description=normalize_whitespace(description), | |
metavar=_type is not bool and ('<%s>' % dest) or '') | |
seen.add(dest) | |
# also ensure we have a param for each default value | |
for key, value in argspec.defaults.items(): | |
if key not in seen: | |
yield key, Param(type=type(value), | |
dest=key, | |
description='', | |
metavar=(type(value) is not bool and | |
('<%s>' % key) or '')) | |
def discover_options(docstring, params): | |
"""Discover :option: descriptors from a docstring | |
:yields: option string, Option tuples | |
""" | |
for match in OPTION_RE.finditer(docstring): | |
dest, options, metavar = match.group('dest', 'options', 'metavar') | |
if dest not in params: | |
raise ValueError("invalid arg '%s'" % dest) | |
options = tuple(opt.strip() | |
for opt in options.split(',') | |
if opt.strip()) | |
if not metavar or params[dest].type is bool: | |
metavar = '' | |
shortopt = None | |
longopt = None | |
for opt in options: | |
if opt.startswith('--'): | |
longopt = opt | |
elif opt.startswith('-'): | |
shortopt = opt | |
# pylint: disable=W0212 | |
option = Option(params[dest]._replace(metavar=metavar), | |
shortopt, | |
longopt) | |
if shortopt: | |
yield shortopt, option | |
if longopt: | |
yield longopt, option | |
class Cmd(collections.namedtuple('Cmd', ['name', | |
'aliases', | |
'summary', | |
'description', | |
'options', | |
'argspec', | |
'function'])): | |
"""Represents a single CLI command | |
:attr name: name of the command | |
:attr aliases: iterable of alias names for the command | |
:attr summary: string oneline summary of the command | |
:attr description: string description of the command | |
:attr options: mapping of option to function parameter | |
:attr argspec: cached argspec for the function | |
:attr function: target function that should be called | |
""" | |
def print_usage(self, stream=sys.stdout): | |
"""Format a usage line for this command""" | |
required = ['<%s>' % name | |
for name in self.argspec.args | |
if name not in self.argspec.defaults] | |
if self.argspec.varargs: | |
required.append('[%s...]' % self.argspec.varargs) | |
print("Usage: %s [options] %s" % (self.name, ' '.join(required)), | |
file=stream) | |
if self.aliases: | |
print(file=stream) | |
print("Aliases: %s" % ', '.join(self.aliases)) | |
def print_options(self, stream=sys.stdout): | |
"""Print options for this command""" | |
options = dict((opt.param.dest, opt) for opt in self.options.values()) | |
maxwidth = max(len(opt.format()) for opt in options.values()) | |
has_multi = any(opt.param.type is list for opt in options.values()) | |
for arg in self.argspec.args: | |
if arg not in options: | |
continue | |
opt = options[arg] | |
value = '%-*s %s' % (maxwidth, opt.format(), opt.param.description) | |
for line in wrap(value, maxwidth + 1): | |
print(line, file=stream) | |
if has_multi: | |
print(file=stream) | |
print("[+] marked option can be specified multiple times") | |
def help(self, stream=sys.stdout): | |
"""Format a help string for this command""" | |
self.print_usage(stream) | |
if self.description: | |
print(file=stream) | |
print(self.description, file=stream) | |
if self.options: | |
print(file=stream) | |
self.print_options(stream) | |
@classmethod | |
def from_function(cls, function, name=None, aliases=()): | |
"""Create a Cmd instance from a function signature | |
:param function: target function for this command | |
:param name: name for this command; default: function.__name__ | |
:param aliases: list of aliases for this command; default: no aliases | |
:returns: new Cmd instance | |
""" | |
docstring = inspect.getdoc(function) or '' | |
argspec = inspect.getargspec(function) | |
defaults = {} | |
# pylint: disable=W0212,E1101 | |
if argspec.defaults: | |
defaults = dict(zip(argspec.args[-len(argspec.defaults):], | |
argspec.defaults)) | |
argspec = argspec._replace(defaults=defaults) | |
params = dict(discover_params(docstring, argspec)) | |
options = {} | |
for opt, option in discover_options(docstring, params): | |
options[opt] = option | |
# add keyword arguments as default options | |
mapped_dests = set(opt.param.dest for opt in options.values()) | |
for arg in argspec.args: | |
if arg not in mapped_dests and arg in defaults: | |
opt = '--' + arg.replace('_', '-') | |
options[opt] = Option(params[arg], None, opt) | |
return cls(name=name or function.__name__, | |
aliases=tuple(aliases), | |
summary=(docstring.strip() or '\n').splitlines()[0], | |
description=normalize_paragraphs(docstring), | |
options=options, | |
argspec=argspec, | |
function=function) | |
class CLI(object): | |
"""Manage a simple CLI interface""" | |
def __init__(self, scriptname=None, version=None): | |
self.scriptname = scriptname or os.path.basename(sys.argv[0]) | |
self.version = version | |
self.commands = {} | |
def usage(self, name=None, stream=sys.stdout): | |
"""Print usage for this CLI manager""" | |
if not name: | |
print("%s [--help] [--version] <command> [options]" % | |
self.scriptname, file=stream) | |
else: | |
self.commands[name].print_usage(stream) | |
def help(self, stream=sys.stdout): | |
"""Output help for this CLI instance""" | |
self.usage(stream=stream) | |
if self.commands: | |
print(file=stream) | |
names = set(cmd.name for cmd in self.commands.values()) | |
maxwidth = len(max(names, key=len)) | |
print(file=stream) | |
print("available commands:", file=stream) | |
print(file=stream) | |
for name in sorted(names): | |
summary = self.commands[name].summary | |
entry = '%+*s %s' % (maxwidth, name, summary) | |
for line in wrap(entry, maxwidth + 1): | |
print(line, file=stream) | |
def command(self, function=MISSING, name=None, aliases=()): | |
"""Command decorator to register new command functions | |
cli = CLI() | |
@cli.command | |
def myfunction(opt1=False, opt2='foo'): | |
... | |
""" | |
if function is not MISSING: | |
return self.command()(function) | |
def wrapper(function): | |
"""Register function with this CLI instance""" | |
cmd = Cmd.from_function(function, name=name, aliases=aliases) | |
self.commands[cmd.name] = cmd | |
for alias in aliases: | |
self.commands[alias] = self.commands[cmd.name] | |
return function | |
return wrapper | |
@staticmethod | |
def _process_option(cmd, argv, kwargs, option, arg=MISSING): | |
"""Process a single option""" | |
if cmd is None or option not in cmd.options: | |
raise CLIError("unrecognized option '%s'" % (option,), | |
cmd=cmd, | |
status=os.EX_USAGE) | |
param = cmd.options[option].param | |
consumes_argv = False | |
if param.requires_arg: | |
if arg is MISSING and not argv: | |
raise CLIError("option '%s' requires an argument" % option, | |
cmd=cmd, | |
status=os.EX_USAGE) | |
arg = arg if arg is not MISSING else argv.pop(0) | |
consumes_argv = True | |
else: | |
if param.dest in cmd.argspec.defaults: | |
arg = not cmd.argspec.defaults[param.dest] | |
else: | |
arg = True | |
if param.type is list: | |
kwargs.setdefault(param.dest, []).append(arg) | |
else: | |
if param.type is not bool: | |
try: | |
arg = param.type(arg) | |
except ValueError: | |
raise CLIError("%s expects %s, but '%s' was provided" % | |
(option, param.type.__name__, arg), | |
cmd=cmd, | |
status=os.EX_USAGE) | |
kwargs[param.dest] = arg | |
return consumes_argv | |
@staticmethod | |
def _check_command_args(cmd, args, kwargs): | |
"""Validate parsed command args/kwargs""" | |
if not cmd: | |
raise CLIError("No command specified", status=os.EX_USAGE) | |
# check for required arguments | |
# if varargs is present, we only need to check for a minimum | |
# number of arguments | |
provided_argcount = len(args) | |
required_argcount = len(cmd.argspec.args) - len(cmd.argspec.defaults) | |
if provided_argcount < required_argcount or \ | |
(provided_argcount != required_argcount and | |
not cmd.argspec.varargs): | |
raise CLIError(("'%s' requires %s argument%s, but %s provided") % | |
(cmd.name, | |
required_argcount or 'no', | |
required_argcount != 1 and 's' or '', | |
provided_argcount), | |
cmd=cmd, status=os.EX_USAGE) | |
argspec = cmd.argspec | |
if argspec.varargs: | |
req_args = args[0:required_argcount] | |
var_argcount = provided_argcount - required_argcount | |
varargs = args[-var_argcount:] if var_argcount else [] | |
args[:] = req_args + [kwargs.pop(key, argspec.defaults[key]) | |
for key in argspec.args[required_argcount:] | |
] + varargs | |
def parse_args(self, argv): | |
"""Parse an argument list for a command | |
:returns: (args, kwargs) | |
""" | |
cmd = None | |
args = [] | |
kwargs = {} | |
while argv: | |
arg = argv.pop(0) | |
if arg in ('-V', '--version') and cmd is None: | |
raise CLIVersion() | |
if arg == '--': | |
if not cmd: | |
raise CLIError("unrecognized option '--'", | |
status=os.EX_USAGE) | |
break | |
elif arg in ('-h', '--help'): | |
raise CLIError(cmd=cmd) | |
elif arg.startswith('--'): | |
opt, _, arg = arg.partition('=') | |
optarg = _ and arg or MISSING | |
self._process_option(cmd, argv, kwargs, opt, optarg) | |
elif arg.startswith("-") and len(arg) > 1: | |
opts = arg[1:] | |
while opts: | |
sopt = '-' + opts[0] | |
optarg = opts[1:] or MISSING | |
if self._process_option(cmd, argv, kwargs, sopt, optarg): | |
break | |
opts = opts[1:] | |
else: | |
if not cmd: | |
try: | |
cmd = self.commands[arg] | |
except KeyError: | |
raise CLIError("'%s' is not a valid command." % arg, | |
status=os.EX_USAGE) | |
else: | |
args.append(arg) | |
args.extend(argv) | |
self._check_command_args(cmd, args, kwargs) | |
return cmd, args, kwargs | |
def run(self, argv=None): | |
"""Execute a command given an argument list""" | |
argv = argv or sys.argv[1:] | |
try: | |
cmd, args, kwargs = self.parse_args(argv) | |
except CLIVersion: | |
print("%s v%s" % (self.scriptname, self.version)) | |
return os.EX_OK | |
except CLIError as exc: | |
stream = exc.status != os.EX_OK and sys.stderr or sys.stdout | |
if exc.message: | |
print("%s" % exc.message, file=stream) | |
if exc.cmd: | |
exc.cmd.help(stream) | |
else: | |
self.help(stream) | |
return exc.status | |
# pylint: disable=W0142 | |
ret = cmd.function(*args, **kwargs) | |
try: | |
return int(ret) | |
except (ValueError, TypeError): | |
return os.EX_SOFTWARE | |
# pylint: disable=C0103 | |
cli = CLI() | |
command = cli.command | |
run = cli.run |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment