Skip to content

Instantly share code, notes, and snippets.

@abg
Last active August 29, 2015 13:56
Show Gist options
  • Save abg/8904431 to your computer and use it in GitHub Desktop.
Save abg/8904431 to your computer and use it in GitHub Desktop.
simple command line helper - inspired by baker.py
"""
cli
~~~
Simple CLI API
@cli.command
def mycommand(required_arg, option='default', *varargs):
...
sys.exit(cli.run())
"""
from __future__ import unicode_literals, print_function
import collections
import inspect
import itertools
import os
import re
import sys
import textwrap
# pylint: disable=R0903
MISSING = object()
class CLIError(Exception):
"""Error occurred during CLI processing"""
def __init__(self, message="", cmd=None, status=os.EX_OK):
super(CLIError, self).__init__(message)
self.message = message
self.cmd = cmd
self.status = status
class CLIVersion(Exception):
"""Raised if --version option is specified"""
OPTION_RE = re.compile(r'''
^([ \t]*)
[:]option
\s+(?P<dest>[a-zA-Z_][a-zA-Z0-9_]*)[:]
\s*(?P<options>(?:.*?))
(?:[ \t]+(?P<metavar>[^ \t]+))?\s*$
''',
re.X | re.M)
PARAM_RE = re.compile(r'''
^([ \t]*)
[:]param
(?:\s+(?P<type>[a-zA-Z_][a-zA-Z0-9_]*))?
\s+(?P<dest>[a-zA-Z_][a-zA-Z0-9_]*)[:]
[ \t]*(?P<description>.*?(?:^(?:^\1\s+.*?^)*|\Z))
''',
re.X | re.S | re.M)
def normalize_whitespace(value):
"""Replace consecutive whitespace in a string with a single space
:param value: string to normalize
:returns: normalized string
"""
return re.sub(r'\s+', ' ', value).strip()
def normalize_paragraphs(docstring):
"""Cleanup a description from a docstring
Remove :option: and :param: field lists from a docstring and ensures only a
single line break between paragraphs.
:param docstring: docstring to normalize
:returns: normalized string
"""
value = OPTION_RE.sub('', docstring)
value = PARAM_RE.sub('', value)
value = re.sub(r'(\S)\s(\S)', r'\1 \2', value)
value = "\n".join(line
for para in re.split(r'\n\s*\n', value)
for line in itertools.chain(wrap(para), ['']))
return value.strip()
def wrap(value, indent=0, width=79):
"""Wrap a string value via textwrap.wrap
:param value: string value to wrap
:param int indent: number of spaces to indent subsequent lines
:param int width: number of columns to wrap to
:returns: wrapped string value
"""
sindent = ' '*indent
for line in textwrap.wrap(value, subsequent_indent=sindent, width=width):
yield line
class Param(collections.namedtuple('Param', 'dest type description metavar')):
"""Represents a function parameter
:attr dest: parameter destination name
:attr type: parameter type object
:attr description: description of the parameter
"""
@property
def requires_arg(self):
"""Determine if this parameter requires an argument
:returns: True if the type is not a boolean, False otherwise
"""
return self.type is not bool
class Option(collections.namedtuple('Option', 'param shortopt longopt')):
"""Represent a single command option
:attr param: ``Param`` instance the option maps to
:attr shortopt: short -o option for this option
:attr longopt: long --option for this option
"""
def format(self):
"""Format this instance as a human readable string"""
tag = '[+]' if self.param.type is list else ''
shortopt = self.shortopt or ''
longopt = self.longopt or ''
metavar = self.param.metavar or ''
return '%-3s %-2s %-s%s' % (tag, shortopt, longopt, metavar)
def discover_params(docstring, argspec):
"""Discover :param: descriptors from a docstring
:yields: arg, Param tuples
"""
seen = set()
for match in PARAM_RE.finditer(docstring):
_type, dest, description = match.group('type', 'dest', 'description')
_type = _type and __builtins__[_type] or str
if _type is None and dest in argspec.defaults:
_type = type(argspec.defaults[dest])
yield dest, Param(dest=dest,
type=_type,
description=normalize_whitespace(description),
metavar=_type is not bool and ('<%s>' % dest) or '')
seen.add(dest)
# also ensure we have a param for each default value
for key, value in argspec.defaults.items():
if key not in seen:
yield key, Param(type=type(value),
dest=key,
description='',
metavar=(type(value) is not bool and
('<%s>' % key) or ''))
def discover_options(docstring, params):
"""Discover :option: descriptors from a docstring
:yields: option string, Option tuples
"""
for match in OPTION_RE.finditer(docstring):
dest, options, metavar = match.group('dest', 'options', 'metavar')
if dest not in params:
raise ValueError("invalid arg '%s'" % dest)
options = tuple(opt.strip()
for opt in options.split(',')
if opt.strip())
if not metavar or params[dest].type is bool:
metavar = ''
shortopt = None
longopt = None
for opt in options:
if opt.startswith('--'):
longopt = opt
elif opt.startswith('-'):
shortopt = opt
# pylint: disable=W0212
option = Option(params[dest]._replace(metavar=metavar),
shortopt,
longopt)
if shortopt:
yield shortopt, option
if longopt:
yield longopt, option
class Cmd(collections.namedtuple('Cmd', ['name',
'aliases',
'summary',
'description',
'options',
'argspec',
'function'])):
"""Represents a single CLI command
:attr name: name of the command
:attr aliases: iterable of alias names for the command
:attr summary: string oneline summary of the command
:attr description: string description of the command
:attr options: mapping of option to function parameter
:attr argspec: cached argspec for the function
:attr function: target function that should be called
"""
def print_usage(self, stream=sys.stdout):
"""Format a usage line for this command"""
required = ['<%s>' % name
for name in self.argspec.args
if name not in self.argspec.defaults]
if self.argspec.varargs:
required.append('[%s...]' % self.argspec.varargs)
print("Usage: %s [options] %s" % (self.name, ' '.join(required)),
file=stream)
if self.aliases:
print(file=stream)
print("Aliases: %s" % ', '.join(self.aliases))
def print_options(self, stream=sys.stdout):
"""Print options for this command"""
options = dict((opt.param.dest, opt) for opt in self.options.values())
maxwidth = max(len(opt.format()) for opt in options.values())
has_multi = any(opt.param.type is list for opt in options.values())
for arg in self.argspec.args:
if arg not in options:
continue
opt = options[arg]
value = '%-*s %s' % (maxwidth, opt.format(), opt.param.description)
for line in wrap(value, maxwidth + 1):
print(line, file=stream)
if has_multi:
print(file=stream)
print("[+] marked option can be specified multiple times")
def help(self, stream=sys.stdout):
"""Format a help string for this command"""
self.print_usage(stream)
if self.description:
print(file=stream)
print(self.description, file=stream)
if self.options:
print(file=stream)
self.print_options(stream)
@classmethod
def from_function(cls, function, name=None, aliases=()):
"""Create a Cmd instance from a function signature
:param function: target function for this command
:param name: name for this command; default: function.__name__
:param aliases: list of aliases for this command; default: no aliases
:returns: new Cmd instance
"""
docstring = inspect.getdoc(function) or ''
argspec = inspect.getargspec(function)
defaults = {}
# pylint: disable=W0212,E1101
if argspec.defaults:
defaults = dict(zip(argspec.args[-len(argspec.defaults):],
argspec.defaults))
argspec = argspec._replace(defaults=defaults)
params = dict(discover_params(docstring, argspec))
options = {}
for opt, option in discover_options(docstring, params):
options[opt] = option
# add keyword arguments as default options
mapped_dests = set(opt.param.dest for opt in options.values())
for arg in argspec.args:
if arg not in mapped_dests and arg in defaults:
opt = '--' + arg.replace('_', '-')
options[opt] = Option(params[arg], None, opt)
return cls(name=name or function.__name__,
aliases=tuple(aliases),
summary=(docstring.strip() or '\n').splitlines()[0],
description=normalize_paragraphs(docstring),
options=options,
argspec=argspec,
function=function)
class CLI(object):
"""Manage a simple CLI interface"""
def __init__(self, scriptname=None, version=None):
self.scriptname = scriptname or os.path.basename(sys.argv[0])
self.version = version
self.commands = {}
def usage(self, name=None, stream=sys.stdout):
"""Print usage for this CLI manager"""
if not name:
print("%s [--help] [--version] <command> [options]" %
self.scriptname, file=stream)
else:
self.commands[name].print_usage(stream)
def help(self, stream=sys.stdout):
"""Output help for this CLI instance"""
self.usage(stream=stream)
if self.commands:
print(file=stream)
names = set(cmd.name for cmd in self.commands.values())
maxwidth = len(max(names, key=len))
print(file=stream)
print("available commands:", file=stream)
print(file=stream)
for name in sorted(names):
summary = self.commands[name].summary
entry = '%+*s %s' % (maxwidth, name, summary)
for line in wrap(entry, maxwidth + 1):
print(line, file=stream)
def command(self, function=MISSING, name=None, aliases=()):
"""Command decorator to register new command functions
cli = CLI()
@cli.command
def myfunction(opt1=False, opt2='foo'):
...
"""
if function is not MISSING:
return self.command()(function)
def wrapper(function):
"""Register function with this CLI instance"""
cmd = Cmd.from_function(function, name=name, aliases=aliases)
self.commands[cmd.name] = cmd
for alias in aliases:
self.commands[alias] = self.commands[cmd.name]
return function
return wrapper
@staticmethod
def _process_option(cmd, argv, kwargs, option, arg=MISSING):
"""Process a single option"""
if cmd is None or option not in cmd.options:
raise CLIError("unrecognized option '%s'" % (option,),
cmd=cmd,
status=os.EX_USAGE)
param = cmd.options[option].param
consumes_argv = False
if param.requires_arg:
if arg is MISSING and not argv:
raise CLIError("option '%s' requires an argument" % option,
cmd=cmd,
status=os.EX_USAGE)
arg = arg if arg is not MISSING else argv.pop(0)
consumes_argv = True
else:
if param.dest in cmd.argspec.defaults:
arg = not cmd.argspec.defaults[param.dest]
else:
arg = True
if param.type is list:
kwargs.setdefault(param.dest, []).append(arg)
else:
if param.type is not bool:
try:
arg = param.type(arg)
except ValueError:
raise CLIError("%s expects %s, but '%s' was provided" %
(option, param.type.__name__, arg),
cmd=cmd,
status=os.EX_USAGE)
kwargs[param.dest] = arg
return consumes_argv
@staticmethod
def _check_command_args(cmd, args, kwargs):
"""Validate parsed command args/kwargs"""
if not cmd:
raise CLIError("No command specified", status=os.EX_USAGE)
# check for required arguments
# if varargs is present, we only need to check for a minimum
# number of arguments
provided_argcount = len(args)
required_argcount = len(cmd.argspec.args) - len(cmd.argspec.defaults)
if provided_argcount < required_argcount or \
(provided_argcount != required_argcount and
not cmd.argspec.varargs):
raise CLIError(("'%s' requires %s argument%s, but %s provided") %
(cmd.name,
required_argcount or 'no',
required_argcount != 1 and 's' or '',
provided_argcount),
cmd=cmd, status=os.EX_USAGE)
argspec = cmd.argspec
if argspec.varargs:
req_args = args[0:required_argcount]
var_argcount = provided_argcount - required_argcount
varargs = args[-var_argcount:] if var_argcount else []
args[:] = req_args + [kwargs.pop(key, argspec.defaults[key])
for key in argspec.args[required_argcount:]
] + varargs
def parse_args(self, argv):
"""Parse an argument list for a command
:returns: (args, kwargs)
"""
cmd = None
args = []
kwargs = {}
while argv:
arg = argv.pop(0)
if arg in ('-V', '--version') and cmd is None:
raise CLIVersion()
if arg == '--':
if not cmd:
raise CLIError("unrecognized option '--'",
status=os.EX_USAGE)
break
elif arg in ('-h', '--help'):
raise CLIError(cmd=cmd)
elif arg.startswith('--'):
opt, _, arg = arg.partition('=')
optarg = _ and arg or MISSING
self._process_option(cmd, argv, kwargs, opt, optarg)
elif arg.startswith("-") and len(arg) > 1:
opts = arg[1:]
while opts:
sopt = '-' + opts[0]
optarg = opts[1:] or MISSING
if self._process_option(cmd, argv, kwargs, sopt, optarg):
break
opts = opts[1:]
else:
if not cmd:
try:
cmd = self.commands[arg]
except KeyError:
raise CLIError("'%s' is not a valid command." % arg,
status=os.EX_USAGE)
else:
args.append(arg)
args.extend(argv)
self._check_command_args(cmd, args, kwargs)
return cmd, args, kwargs
def run(self, argv=None):
"""Execute a command given an argument list"""
argv = argv or sys.argv[1:]
try:
cmd, args, kwargs = self.parse_args(argv)
except CLIVersion:
print("%s v%s" % (self.scriptname, self.version))
return os.EX_OK
except CLIError as exc:
stream = exc.status != os.EX_OK and sys.stderr or sys.stdout
if exc.message:
print("%s" % exc.message, file=stream)
if exc.cmd:
exc.cmd.help(stream)
else:
self.help(stream)
return exc.status
# pylint: disable=W0142
ret = cmd.function(*args, **kwargs)
try:
return int(ret)
except (ValueError, TypeError):
return os.EX_SOFTWARE
# pylint: disable=C0103
cli = CLI()
command = cli.command
run = cli.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment