Created
January 28, 2018 00:28
-
-
Save philpennock/c4a24e701589267d73fee979b2a411d9 to your computer and use it in GitHub Desktop.
Passwords/admin to manage the passwords repo. You'll need to update `DEFAULT_IDENTITY` at the very least.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
admin: Passwords repo admin tool | |
Passwords repo has a bunch of actions which are common; copy/paste is | |
annoying. So manage the common actions. | |
""" | |
__author__ = 'phil.pennock@spodhuis.org (Phil Pennock)' | |
import argparse | |
import collections | |
import contextlib | |
import datetime | |
import fcntl | |
import fnmatch | |
import functools | |
import io | |
import json | |
import os | |
import re | |
import shlex | |
import shutil | |
import stat | |
import subprocess | |
import sys | |
import time | |
# Color support via colorama {{{ | |
try: | |
import colorama | |
colorama.init() | |
def color(msg, styling): | |
result = [] | |
for style in styling.split(','): | |
style = style.upper() | |
for m in (colorama.Fore, colorama.Style, colorama.Back): | |
if hasattr(m, style): | |
result.append(getattr(m, style)) | |
break | |
result.append(msg) | |
result.append(colorama.Style.RESET_ALL) | |
return ''.join(result) | |
except ImportError: | |
def color(msg, styling): | |
return msg | |
# Color support via colorama }}} | |
EncryptionIdentity = collections.namedtuple('EncryptionIdentity', ['sign', 'encrypt']) | |
# Globals {{{ | |
PROGNAME = 'admin' | |
_GIT_COMMAND = None | |
_GPG_COMMAND = None | |
_OATHTOOL_COMMAND = None | |
_NOT_REALLY = False | |
_DELETE_ALWAYS = False | |
_VERBOSE = 0 | |
# subprocess module ignores post-process-start modifications to | |
# os.environ['PATH'] so since we already have a command selector, we can shove | |
# path preferences in there. | |
# Note: we prefer any of _DESIRED_GPG_COMMANDS in locations within _DESIRED_GPG_PREFER_PATHS | |
# to any of _DESIRED_GPG_COMMANDS, even earlier entries, within the normal PATH. | |
# So it's "desired installation areas" over "desired command names" | |
_DESIRED_GPG_PREFER_PATHS = [ '/opt/gnupg/bin' ] | |
# | |
_DESIRED_GPG_COMMANDS = [ 'gpg2.3', 'gpg2.2', 'gpg2.1', 'gpg2', 'gpg' ] | |
_ASSUME_GPGAGENT_SHOWS_IDENTITY = True | |
DEFAULT_IDENTITY = EncryptionIdentity( | |
# gpg -v --fingerprint --fingerprint $pgp_key_main | |
sign = '4E5C179E7FFC4DBF8E6C132F51104E668DD04481!', # no expiration | |
encrypt = '897C93EC1E5BA64B2E8ED0D2FBC6848A6FA1B909!', # expires: 2020-02-27 | |
) | |
TOP_REMAP_IDENTITIES = { | |
} | |
# these relative to base: | |
SKIP_ENCRYPTION_FILES = set(( | |
'admin', | |
'NOTES.txt', | |
'OTP.list.txt', | |
'template.totp', | |
)) | |
# these apply only against basename | |
SKIP_ENCRYPTION_GLOB = set(( | |
'*.noenc', | |
'*.counter', | |
'README.txt', | |
)) | |
_DEFAULT_TOTP_TIMETHRESHOLD_SHOWNEXT = 8 # seconds | |
# Globals }}} | |
class Error(Exception): | |
"""Base class for exceptions from admin.""" | |
pass | |
class ExitError(Error): | |
"""Error exception which should force exit without stack-trace.""" | |
pass | |
class FindError(Error): | |
"""Problem finding an exact match for a given spec.""" | |
pass | |
# Utility functions {{{ | |
def verbose(msg, level=1): | |
if _VERBOSE >= level: | |
print(msg) | |
def show_error(msg): | |
print('{0}: {1}'.format(PROGNAME, msg), file=sys.stderr) | |
def encryption_command(identity): | |
sign_identity = identity | |
encrypt_identity = identity | |
if isinstance(identity, EncryptionIdentity): | |
sign_identity = identity.sign | |
encrypt_identity = identity.encrypt | |
# -aset : ascii sign encrypt text-mode | |
cmdline = [_GPG_COMMAND, '--for-your-eyes-only', '-aset'] | |
if identity is not None: | |
cmdline.extend(['--no-default-recipient', '--no-encrypt-to']) | |
cmdline.extend(['-u', sign_identity, '-r', encrypt_identity]) | |
return cmdline | |
def have_gpg_agent(): | |
if 'GPG_AGENT_INFO' in os.environ: | |
return True | |
# gnupg 2.1 does automatic rendezvous | |
p = os.path.expanduser('~/.gnupg/S.gpg-agent') | |
try: | |
return stat.S_ISSOCK(os.stat(p).st_mode) | |
except Exception as e: | |
return False | |
def matches_skip_encryption(rel_filename): | |
if rel_filename in SKIP_ENCRYPTION_FILES: | |
return True | |
base = os.path.basename(rel_filename) | |
for g in SKIP_ENCRYPTION_GLOB: | |
if fnmatch.fnmatch(base, g): | |
return True | |
return False | |
def prompt_yesno(message): | |
r = input('{0} [y/N]? '.format(message)) | |
if not r: | |
return False | |
if r == 'y' or r == 'Y': | |
return True | |
return False | |
def find_best_command(label, *, explicit, candidates, prefer_paths=None, _only_paths=None): | |
if prefer_paths: | |
try: | |
return find_best_command(label, explicit=explicit, candidates=candidates, _only_paths=prefer_paths) | |
except: | |
pass | |
with_paths = os.pathsep.join(_only_paths) if _only_paths is not None else os.environ['PATH'] | |
if explicit: | |
p = shutil.which(explicit, path=with_paths) | |
if p: | |
return p | |
raise ExitError('specified {0} command {1!r} not found'.format(label, explicit)) | |
for c in candidates: | |
p = shutil.which(c, path=with_paths) | |
if p: | |
return p | |
raise ExitError('unable to find a {0} command'.format(label,)) | |
# Utility functions }}} | |
# Serialization support {{{ | |
class ourjson_encoder(json.JSONEncoder): | |
def default(self, o): | |
if hasattr(o, 'serialize_attrs_to_keys'): | |
s = getattr(o, 'serialize_attrs_to_keys') | |
if callable(s): | |
s = s() | |
d = {} | |
for k in s: | |
if hasattr(o, k): | |
d[k] = getattr(o, k) | |
return d | |
return json.JSONEncoder.default(self, o) | |
def ourjson_dumps(*args, **kwargs): | |
return json.dumps(*args, cls=ourjson_encoder, **kwargs) | |
class SerializingObject(object): | |
def serialize_attrs_to_keys(self): | |
for k in dir(self): | |
if k.startswith('_'): | |
continue | |
if callable(getattr(self, k)): | |
continue | |
yield k | |
# Serialization support }}} | |
# Command registration {{{ | |
class _AllCommands(object): | |
def __init__(self): | |
self.commands = {} | |
self.repo = None | |
self.options = None | |
self.subparser = None | |
def __getitem__(self, item): | |
if item in self.commands: | |
return self.commands[item] | |
else: | |
raise Error('No such command: {0}'.format(item)) | |
def __contains__(self, item): | |
return item in self.commands | |
def set_subparser(self, subparser): | |
self.subparser = subparser | |
for cmd in sorted(self.commands.keys()): | |
self.commands[cmd].parser = subparser.add_parser(cmd, help=self.commands[cmd].help()) | |
self.commands[cmd].parser.set_defaults(action=self.commands[cmd]) | |
# Consider: register sub-commands from the command itself | |
self.commands[cmd].parser.add_argument('cmdline', nargs='*') | |
AllCommands = _AllCommands() | |
class Command(object): | |
def __init__(self, function, allcommands): | |
self.function = function | |
self.commands = allcommands | |
self.parser = None | |
for k in ('__name__', '__doc__', '__annotations__'): | |
try: | |
setattr(self, k, getattr(function, k)) | |
except AttributeError: | |
pass | |
def __call__(self, *args, **kwargs): | |
return self.function(*args, repo=self.commands.repo, options=self.commands.options, **kwargs) | |
def help(self): | |
return self.function.__doc__ | |
def command_register(wrapped): | |
AllCommands.commands[wrapped.__name__] = Command(wrapped, AllCommands) | |
# Command registration }}} | |
class PasswordRepoEntry(object): # {{{ | |
def __init__(self, repo, basedir, filename, identity=None): | |
self.repo = repo | |
self.fullpath = os.path.join(basedir, filename) | |
self.is_encrypted = False | |
self.identity = identity | |
self.relpath = self.fullpath[len(self.repo.basedir)+1:] | |
self.rel_to_top_repo = self.fullpath[len(self.repo.identity_basedir)+1:] | |
root, ext = os.path.splitext(self.fullpath) | |
if ext and ext == '.asc': | |
self.is_encrypted = True | |
self.pairpath = root | |
self.should_be_encrypted = True | |
else: | |
self.pairpath = self.fullpath + '.asc' | |
self.should_be_encrypted = not matches_skip_encryption(self.relpath) | |
verbose(self, level=2) | |
def __str__(self): | |
return self.rel_to_top_repo | |
def stat(self): | |
if not hasattr(self, '_stat_me'): | |
self._stat_me = os.stat(self.fullpath) | |
return self._stat_me | |
def need_encrypt(self): | |
if self.is_encrypted: | |
return False | |
if not self.should_be_encrypted: | |
return False | |
try: | |
sb_pair = os.stat(self.pairpath) | |
except FileNotFoundError: | |
return True | |
if sb_pair.st_size == 0: | |
return True | |
sb_me = self.stat() | |
if not sb_me: | |
raise Error('missing stat of self for {0}'.format(self.relpath)) | |
# go on basis that mtime equal is sufficiently unlikely that should re-encrypt for safety | |
if sb_pair.st_mtime > sb_me.st_mtime: | |
verbose('Have newer encrypted pair: {0}'.format(self.relpath)) | |
return False | |
return True | |
def encrypt(self): | |
if self.is_encrypted: | |
raise Error('Already encrypted: {0}'.format(self.relpath)) | |
if not self.should_be_encrypted: | |
raise Error('Should not be encrypted: {0}'.format(self.relpath)) | |
cmd = encryption_command(self.identity) | |
cmd.append(self.fullpath) | |
verbose('Run: {0}'.format(' '.join(cmd))) | |
if not _NOT_REALLY: | |
subprocess.check_call(cmd) | |
self.git_add_pair() | |
def should_delete(self): | |
if self.is_encrypted: | |
return False | |
if not self.should_be_encrypted: | |
return False | |
if self.need_encrypt(): | |
return False | |
return True | |
def Delete(self): | |
verbose('Delete: {0}'.format(self.relpath)) | |
if _NOT_REALLY: | |
return | |
subprocess.call(["ls", "-ld", self.fullpath, self.pairpath]) | |
if _DELETE_ALWAYS or prompt_yesno('delete {0}'.format(shlex.quote(self.relpath))): | |
os.unlink(self.fullpath) | |
else: | |
print('Not deleting {0}'.format(shlex.quote(self.relpath))) | |
def _pair_object(self): | |
return PasswordRepoEntry(self.repo, self.repo.basedir, os.path.relpath(self.pairpath, self.repo.basedir), identity=self.identity) | |
def git_add_pair(self): | |
self._pair_object().git_add() | |
def git_add(self): | |
# problem: we currently walk from the top-level repo always, so we don't have the correct git basedir | |
# we want to git -C repo add relpath-for-that-repo | |
self.repo.git("add", self.relpath) | |
def fix_permissions(self): | |
if matches_skip_encryption(self.relpath): | |
return | |
try: | |
s = os.stat(self.fullpath) | |
except OSError as e: | |
show_error(e) | |
return | |
if not ( stat.S_ISREG(s.st_mode) or stat.S_ISDIR(s.st_mode) ): | |
return | |
if s.st_mode & 0o077 != 0: | |
verbose('fixing permissions on: {0}'.format(self.relpath)) | |
try: | |
os.chmod(self.fullpath, s.st_mode & 0o700, follow_symlinks=False) | |
except NotImplementedError: | |
os.chmod(self.fullpath, s.st_mode & 0o700) | |
# }}} | |
class OTP(SerializingObject): # {{{ | |
def __init__(self, *, filename, directory, basedir, repo): | |
self.oathtype = 'HOTP' | |
self.name = filename[:len(filename)-len('.counter')] | |
self.filename = filename | |
self.directory = directory | |
self.basedir = basedir | |
self.repo = repo | |
self.fullpath = os.path.join(self.directory, self.filename) | |
self.otprelpath = self.fullpath[len(self.basedir)+1:] | |
self.reporelpath = self.fullpath[len(self.repo.basedir)+1:] | |
def find_related(self, *, filenames): | |
if self.name in filenames: | |
self.unencrypted = self.name | |
if (self.name + '.asc') in filenames: | |
self.encrypted = self.name + '.asc' | |
@contextlib.contextmanager | |
def locked(self): | |
with open(self.fullpath, 'r+') as fd: | |
fcntl.lockf(fd, fcntl.LOCK_EX) | |
self.counter_fd = fd | |
self.current_counter = int(fd.read().strip()) | |
fd.seek(0, 0) | |
yield | |
delattr(self, 'counter_fd') | |
def oath_cfg_reader(self): | |
if hasattr(self, 'unencrypted'): | |
return open(os.path.join(self.directory, self.unencrypted)) | |
if hasattr(self, 'encrypted'): | |
cmd = [_GPG_COMMAND, '-d', os.path.join(self.directory, self.encrypted)] | |
if _VERBOSE: | |
return io.StringIO(subprocess.check_output(cmd).decode('UTF-8')) | |
return io.StringIO(subprocess.check_output(cmd, stderr=subprocess.DEVNULL).decode('UTF-8')) | |
raise Error('bad {0.oathtype} setup for {0.name}'.format(self)) | |
def print_and_increment(self, *args, options, **kwargs): | |
with self.locked(): | |
oath_config = json.load(self.oath_cfg_reader()) | |
verbose('{0.oathtype} {0.name!r} at counter: {0.current_counter}'.format(self)) | |
if 'hotp' not in oath_config: | |
raise ExitError('{0.oathtype} {0.name!r} oath data missing key "hotp"'.format(self)) | |
cmd = [_OATHTOOL_COMMAND] | |
if _VERBOSE >= 7: | |
# WARNING: This will expose the secret to stdio! | |
cmd.append('-v') | |
cmd.append('--' + self.oathtype.lower()) # we will extend to TOTP later? | |
cmd.append('--counter={0:d}'.format(self.current_counter)) | |
if 'digits' in oath_config: | |
cmd.append('--digits={0:d}'.format(oath_config['digits'])) | |
cmd.append(oath_config['hotp']) | |
subprocess.check_call(cmd) | |
if not _NOT_REALLY: | |
print(self.current_counter + 1, file=self.counter_fd) | |
# Don't add/commit until unlocked and counter written to disk | |
self.repo.git('commit', '-m', 'HOTP token usage', '--', self.reporelpath) | |
# }}} | |
class OTPCollection(object): # {{{ | |
def __init__(self, repo): | |
self.repo = repo | |
self.otpdir = os.path.join(repo.basedir, 'otp') | |
if not os.path.isdir(self.otpdir): | |
raise ExitError('Missing OTP directory: {0}'.format(self.otpdir)) | |
self.known = None | |
if _OATHTOOL_COMMAND is None: | |
show_error('BEWARE: no oathtool command known') | |
def _force_populate(self): | |
self.known = [] | |
for (dirpath, dirnames, filenames) in os.walk(self.otpdir, topdown=True): | |
for f in filenames: | |
if not f.endswith('.counter'): | |
continue | |
hotp = OTP(filename=f, directory=dirpath, basedir=self.otpdir, repo=self.repo) | |
hotp.find_related(filenames=filenames) | |
self.known.append(hotp) | |
if not self.known: | |
self.known = None | |
return | |
self.known_names = set((i.name.lower() for i in self.known)) | |
self.known_by_name = dict((i.name.lower(), i) for i in self.known) | |
def _auto_populate(self): | |
if self.known is None: | |
self._force_populate() | |
if self.known is None: | |
raise Error('unable to find any HOTP counter files') | |
def cmd_list(self, *args, **kwargs): | |
self._auto_populate() | |
for item in self.known: | |
print(item.name) | |
def cmd_with(self, *args, options, **kwargs): | |
if _OATHTOOL_COMMAND is None: | |
raise Error('no oathtool command known') | |
try: | |
name = options.cmdline[1] | |
except IndexError: | |
raise ExitError('need a OTP name to look at (try "list" cmd)') from None | |
self._auto_populate() | |
lname = name.lower() | |
if lname not in self.known_names: | |
candidates = [] | |
for n in self.known_names: | |
if lname in n: | |
candidates.append(n) | |
if not candidates: | |
raise ExitError('unknown OTP token name {0!r}'.format(name,)) | |
if len(candidates) > 1: | |
raise ExitError('ambiguous OTP token name {0!r}, could be: {1!r}'.format(name, candidates)) | |
lname = candidates[0] | |
token = self.known_by_name[lname] | |
#print(ourjson_dumps(token)) | |
# TODO: just "count" or should we have more options? | |
token.print_and_increment(*args, options=options, **kwargs) | |
def has_named(self, name): | |
self._auto_populate() | |
return name.lower() in self.known_names | |
# }}} | |
class PasswordsRepo(object): # {{{ | |
def __init__(self, dir=None, identity_basedir=None): | |
if dir is None: | |
self.basedir = self._find_basedir() | |
else: | |
# don't check for candidacy, is a sub-dir, doesn't need its own 'admin' etc? | |
self.basedir = dir | |
self.identity_basedir = identity_basedir if identity_basedir is not None else self.basedir | |
self._sub_repos = {} | |
def _valid_dir_candidate(self, dirname): | |
return os.path.exists(os.path.join(dirname, 'admin')) and os.path.isdir(os.path.join(dirname, '.git')) | |
def _find_basedir(self): | |
for d_gen in ( | |
lambda : sys.argv[0].rsplit('/', 1)[0], | |
lambda : os.path.expanduser('~/Passwords'), | |
): | |
d = d_gen() | |
if d and self._valid_dir_candidate(d): | |
return os.path.abspath(d) | |
verbose('Skipping invalid repo basedir: {0}'.format(d)) | |
raise Error('no Passwords repo found') | |
def sub_repo(self, abspath): | |
if abspath not in self._sub_repos: | |
self._sub_repos[abspath] = PasswordsRepo(abspath, self.identity_basedir) | |
return self._sub_repos[abspath] | |
def iter_files(self, skip_alts=False): | |
alts = set() | |
altmap = {} | |
# This identity logic was written when we just had one PasswordsRepo logic, before | |
# I realized that for git, I should have multiple; it works still, thanks to the | |
# addition of identity_basedir to use instead of basedir but perhaps there's a cleaner | |
# approach to consider in future. | |
# Not sure if identity is orthogoal to repository, or if should enforce that. | |
if not skip_alts: | |
for k in TOP_REMAP_IDENTITIES: | |
d = os.path.join(self.identity_basedir, k) | |
alts.add(d) | |
altmap[d] = TOP_REMAP_IDENTITIES[k] | |
# these dirnames.remove() calls are inefficient but for the size dirs we're | |
# talking about, we should be able to just live with it. | |
for (dirpath, dirnames, filenames) in os.walk(self.basedir, topdown=True): | |
if '.git' in dirnames: | |
dirnames.remove('.git') | |
if dirpath == self.basedir: | |
verbose('Walk found our own git repo at: {0}'.format(dirpath)) | |
else: | |
verbose('Walk found DIFFERENT git repo at: {0}'.format(dirpath)) | |
r2 = self.sub_repo(dirpath) | |
for e in r2.iter_files(skip_alts): | |
yield e | |
dirnames.clear() | |
verbose('Walk back from sub-repo, now in: {0}'.format(self.basedir)) | |
continue | |
identity = DEFAULT_IDENTITY | |
if dirpath in alts: | |
identity = altmap[dirpath] | |
if skip_alts and dirpath == self.basedir: | |
for d in TOP_REMAP_IDENTITIES: | |
if d in dirnames: | |
dirnames.remove(d) | |
for f in filenames: | |
if f.startswith('.'): | |
continue | |
p = os.path.join(dirpath, f) | |
if os.path.islink(p): | |
try: | |
os.stat(p) | |
except: | |
verbose('warning: dead symlink chain starting at: {0!r}'.format(os.path.relpath(p, self.identity_basedir)), level=0) | |
continue | |
yield PasswordRepoEntry(self, dirpath, f, identity=identity) | |
def walk_repos(self): | |
"""depth-first repo walk, so that any submodules can be updated before the parent.""" | |
for r in self._sub_repos.values(): | |
for e in r.walk_repos(): | |
yield e | |
yield self | |
def git(self, *args, errexit_okay=False): | |
cmd = [_GIT_COMMAND, "-C", self.basedir] | |
cmd.extend(args) | |
verbose('Run: {0}'.format(' '.join(cmd))) | |
if _NOT_REALLY: | |
return | |
if errexit_okay: | |
rc = subprocess.call(cmd) | |
if rc != 0: | |
show_error('command failed: {}'.format(rc)) | |
else: | |
subprocess.check_call(cmd) | |
def decrypt_reader(self, *, find_spec, options): | |
candidates = [] | |
if find_spec.startswith(os.path.curdir + os.path.sep): # assume tab-completed filename | |
full = os.path.abspath(find_spec) | |
pf = self.entry_by_abspath(full) | |
if not pf: | |
raise FindError('no matches found for explicit path {}'.format(find_spec)) | |
candidates.append(pf) | |
else: | |
needle = find_spec.lower() | |
for pf in self.iter_files(skip_alts=options.skip_alts): | |
item = pf.relpath.lower() | |
if needle in item: | |
candidates.append(pf) | |
del pf | |
if len(candidates) == 0: | |
raise FindError('no matches found for {}'.format(find_spec)) | |
if len(candidates) > 1: | |
candidates = self.dedup_entries(candidates) | |
if len(candidates) == 0: | |
raise Error('lost all candidates in dedup') | |
if len(candidates) > 1: | |
e = FindError('multiple matches found for {}'.format(find_spec)) | |
setattr(e, 'candidates', candidates) | |
raise e | |
fn = candidates[0].rel_to_top_repo | |
if not fn.endswith('.asc'): | |
raise FindError('not a .asc file for {}: {}'.format(find_spec, fn)) | |
if _VERBOSE: | |
print('match: {} -> {}'.format(find_spec, fn), file=sys.stderr) | |
cmd = [_GPG_COMMAND, '-d', os.path.join(self.basedir, fn)] | |
if _VERBOSE: | |
return io.StringIO(subprocess.check_output(cmd).decode('UTF-8')), fn | |
return io.StringIO(subprocess.check_output(cmd, stderr=subprocess.DEVNULL).decode('UTF-8')), fn | |
def dedup_entries(self, entry_list): | |
seen = {} | |
results = [] | |
# no guarantee if we return the symlink or the original | |
for pf in entry_list: | |
key = str(pf.stat().st_dev) + ':' + str(pf.stat().st_ino) | |
if key in seen: | |
continue | |
seen[key] = pf | |
results.append(pf) | |
return results | |
def entry_by_abspath(self, fullpath): | |
# assumption: sep is root, which is Unix, but bad assumption for portability? | |
# what's _correct_ here? | |
common = os.path.commonprefix([fullpath, self.basedir]) | |
if common == os.path.sep: | |
return None | |
return self.entry_by_relpath(os.path.relpath(fullpath, self.basedir)) | |
def entry_by_relpath(self, relative): | |
for k in sorted(self._sub_repos.keys(), key=len, reverse=True): | |
rel_to_sub = os.path.relpath(relative, k) | |
if rel_to_sub.startswith(os.path.pardir + os.path.sep): | |
continue | |
return self._sub_repos[k].entry_by_relpath(rel_to_sub) | |
if not os.path.isfile(os.path.join(self.identity_basedir, relative)): | |
return None | |
# FIXME: remapped identities in here currently unused, not going to try and write/debug this without that, | |
# so this will fail for sub-repos with their own PGP identities. | |
return PasswordRepoEntry(self, os.path.join(self.identity_basedir, os.path.dirname(relative)), os.path.basename(relative), DEFAULT_IDENTITY) | |
# EWW what a _horrid_ API. | |
# }}} | |
# Top-level program sub-commands {{{ | |
@command_register | |
def encrypt(*args, repo, options, **kwargs): | |
'encrypt files, clean up and prep for commit' | |
verbose('Base: {0}'.format(repo.basedir)) | |
# We give the user a chance to know which id they might need a passphrase for. | |
# We rely heavily upon gpg-agent caching so that we only show this once per identity. | |
seen_ids = {} | |
def maybe_prompt_id(repofile): | |
if _ASSUME_GPGAGENT_SHOWS_IDENTITY: | |
return | |
if repofile.identity in seen_ids: | |
return | |
seen_ids[repofile.identity] = True | |
dname = shlex.quote(str(repofile)) | |
if repofile.identity is None: | |
input('If prompted for passphrase, is for default PGP key (file {0}) [enter]'.format(dname)) | |
else: | |
input('If prompted for passphrase, is for {0} PGP key (file {1}) [enter]'.format(shlex.quote(repofile.identity), dname)) | |
for pf in repo.iter_files(skip_alts=options.skip_alts): | |
#print('encrypted {0.is_encrypted}/{0.should_be_encrypted}\t{0.relpath}'.format(pf)) | |
pf.fix_permissions() | |
if pf.need_encrypt(): | |
maybe_prompt_id(pf) | |
pf.encrypt() | |
if pf.should_delete(): | |
pf.Delete() | |
# FIXME: handle being mutated, when updating git | |
return AllCommands['status'](*args, **kwargs) | |
@command_register | |
def clean(*args, repo, options, **kwargs): | |
'clean up non-encrypted files' | |
verbose('Base: {0}'.format(repo.basedir)) | |
for pf in repo.iter_files(skip_alts=options.skip_alts): | |
if pf.is_encrypted: | |
continue | |
if not pf.should_be_encrypted: | |
verbose('Skipping file which is expected to be unencrypted: {0.relpath}'.format(pf), level=2) | |
#print('{0} {1}'.format(color('Needs to be encrypted:', 'cyan,bright'), color(pf.relpath, 'cyan'))) | |
continue | |
if pf.need_encrypt(): | |
print('{0} {1}'.format(color('Needs to be encrypted:', 'cyan,bright'), color(pf.relpath, 'cyan'))) | |
continue | |
if not pf.should_delete(): | |
print('{0}: {1.relpath}'.format('Unknown file status', 'red,bright'), color(pf)) | |
continue | |
print('{0} {1}'.format(color('Candidate for deletion:', 'yellow,bright'), color(pf.relpath, 'yellow'))) | |
pf.Delete() | |
@command_register | |
def noop(*args, repo, options, **kwargs): | |
'do nothing except say nada' | |
print('nada') | |
return 0 | |
def _git_in_each(repo, options, cmd, errexit_okay=False): | |
# populate the sub-repos; FIXME: remember why this is needed | |
if not cmd: | |
raise Error('BUG: missing git\'s subcmd') | |
for pf in repo.iter_files(skip_alts=options.skip_alts): | |
pass | |
_NOT_REALLY = False | |
first = True | |
for r in repo.walk_repos(): | |
if first: | |
first = False | |
else: | |
print() | |
print('{0}: {1}'.format(color('In repo', 'cyan,bright'), color(r.basedir, 'cyan'))) | |
r.git(*cmd, errexit_okay=errexit_okay) | |
@command_register | |
def status(*args, repo, options, **kwargs): | |
'show status of repo and sub-repos' | |
_git_in_each(repo, options, 'status --no-short'.split()) | |
return AllCommands['need'](*args, | |
show_banner=color('\nFiles which need to be encrypted:', 'cyan,bright'), | |
**kwargs) | |
@command_register | |
def commit(*args, repo, options, **kwargs): | |
'git commit each repo and sub-repo' | |
_git_in_each(repo, options, ['commit']+options.cmdline, errexit_okay=True) | |
@command_register | |
def push(*args, repo, options, **kwargs): | |
'git push each repo and sub-repo' | |
_git_in_each(repo, options, ['push', 'all']+options.cmdline) | |
# The push/all hack leaves references for the others stale | |
_git_in_each(repo, options, ['fetch', '--all']) | |
@command_register | |
def need(*args, repo, options, show_banner='', **kwargs): | |
'show files which need to be encrypted' | |
verbose('Base: {0}'.format(repo.basedir)) | |
shown = False | |
def show(): | |
nonlocal shown | |
if shown or not show_banner: | |
return | |
shown=True | |
print(show_banner) | |
for pf in repo.iter_files(skip_alts=options.skip_alts): | |
if pf.need_encrypt(): | |
show() | |
print(pf) | |
return 0 | |
@command_register | |
def find(*args, repo, options, **kwargs): | |
'show files containing search key as substring' | |
seek = set(a.lower() for a in options.cmdline) | |
for pf in repo.iter_files(skip_alts=options.skip_alts): | |
item = pf.relpath.lower() | |
for s in seek: | |
if s in item: | |
print(pf) | |
break | |
return 0 | |
@command_register | |
def recent(*args, repo, options, **kwargs): | |
'list N most recent files' | |
time_based = [(pf.stat().st_mtime, str(pf)) for pf in repo.iter_files(skip_alts=options.skip_alts)] | |
time_based.sort(reverse=True) | |
limit = 10 | |
if options.cmdline: | |
try: | |
limit = int(options.cmdline[0]) | |
except: | |
pass | |
time_based = time_based[:limit] | |
time_based = [(t[0], t[1], datetime.datetime.fromtimestamp(t[0])) for t in time_based] | |
namewidth = len(max(time_based, default=('', ' '), key=lambda p: len(p[1]))[1]) | |
format_ = '{name:<{namewidth}s} : {dttime:%Y-%m-%d %H:%M:%S}' | |
for (mtime, name, dttime) in time_based: | |
print(format_.format(time=time, name=name, dttime=dttime, namewidth=namewidth)) | |
@command_register | |
def otp(*args, repo, options, **kwargs): | |
'perform OTP commands (HOTP generations)' | |
# We wrap the OTP collection, with named sub-commands (should redo this using argparse parsers on subparsers?) | |
# We handle the command being defined to exist by the OTPCollection class. | |
# No command => 'list' | |
# If command not known, but token named this does exist, then command => 'with' (implicit) | |
if not options.cmdline: | |
options.cmdline = ['list'] | |
cmd = 'cmd_' + options.cmdline[0].lower() | |
otp_collection = OTPCollection(repo) | |
if not hasattr(otp_collection, cmd): | |
if otp_collection.has_named(options.cmdline[0]): | |
cmd = 'cmd_with' | |
options.cmdline = ['with'] + options.cmdline | |
if hasattr(otp_collection, cmd): | |
return getattr(otp_collection, cmd)(options.cmdline[1:], *args, options=options, **kwargs) | |
raise ExitError('unknown OTP command {!r}'.format(options.cmdline[0],)) | |
def _each_file(args, repo, options, kwargs): | |
if not options.cmdline: | |
raise ExitError('need something to show') | |
errored = 0 | |
for a in options.cmdline: | |
try: | |
out, fn = repo.decrypt_reader(find_spec=a, options=options) | |
yield out, fn | |
except FindError as e: | |
print(e, file=sys.stderr) | |
if hasattr(e, 'candidates'): | |
for c in e.candidates: | |
print(' -> {}'.format(c), file=sys.stderr) | |
print('', file=sys.stderr, flush=True) | |
errored += 1 | |
if errored: | |
raise ExitError('failed: {}'.format(errored)) | |
@command_register | |
def show(*args, repo, options, **kwargs): | |
for out, fn in _each_file(args, repo, options, kwargs): | |
print('File: {}'.format(fn), file=sys.stderr) | |
while True: | |
chunk = out.read(8192) | |
if chunk: | |
print(chunk, end='') | |
else: | |
print('', flush=True) | |
out.close() | |
break | |
@command_register | |
def totp(*args, repo, options, **kwargs): | |
if _OATHTOOL_COMMAND is None: | |
raise ExitError('no oathtool command known') | |
errored = False | |
re_totp2fa = re.compile('^\s*TOTP2FA=[\'"]([^\'"]+)[\'"]') | |
re_url = re.compile(r'[?&]secret=([^&\s]+)') | |
re_nextline = re.compile(r'\bTOTP\b\s+.*\bsecret:\s*$') | |
for out, fn in _each_file(args, repo, options, kwargs): | |
totp2fa = None | |
otpauth_url = None | |
totp_whole = None | |
totp_whole_next = False | |
# next two currently constant, could extract from url? | |
digits = 6 | |
step_duration = 30 | |
token = None | |
for line in out: | |
if totp_whole_next: | |
totp_whole = line | |
totp_whole_next = False | |
continue | |
if 'TOTP2FA=' in line: | |
totp2fa = line | |
break | |
if 'otpauth:' in line and otpauth_url is None: | |
otpauth_url = line | |
if re_nextline.search(line): | |
totp_whole_next = True | |
out.close() | |
if totp2fa is not None: | |
m = re_totp2fa.match(totp2fa) | |
elif otpauth_url is not None: | |
m = re_url.search(otpauth_url) | |
elif totp_whole is not None: | |
token = totp_whole.strip() | |
else: | |
print('No token found in {}'.format(fn), file=sys.stderr) | |
errored = True | |
continue | |
if token is None: | |
if not m: | |
print('Failed to parse token line found in {}'.format(fn), file=sys.stderr) | |
errored = True | |
continue | |
token = m.group(1) | |
cmd = [_OATHTOOL_COMMAND, '--totp', '--base32', | |
'--digits={0:d}'.format(digits), | |
'--time-step-size={0:d}s'.format(step_duration)] | |
if _VERBOSE >= 7: | |
# WARNING: This will expose the secret to stdio! | |
cmd.append('-v') | |
cmd += ['--', token] | |
subprocess.check_call(cmd) | |
remaining = step_duration - (int(time.time()) % step_duration) | |
print('TOTP time remaining: {} seconds'.format(remaining), file=sys.stderr) | |
if remaining <= options.totp_timethreshold_shownext: | |
t = int(time.time()) + remaining + 1 | |
reftime = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(t)) | |
cmd = [_OATHTOOL_COMMAND, '--now=' + reftime] + cmd[1:] | |
print('Next TOTP value [at time: {}]'.format(reftime), file=sys.stderr) | |
subprocess.check_call(cmd, stdout=sys.stderr) # only the _current_ value to stdout, for $() capture | |
return 1 if errored else 0 | |
@command_register | |
def info(*args, repo, options, **kwargs): | |
'report general information' | |
print('Program {PROGNAME}, verbose {_VERBOSE}:\ngit: {_GIT_COMMAND}\ngpg: {_GPG_COMMAND}\noathtool: {_OATHTOOL_COMMAND}'.format( | |
**globals())) | |
# Top-level program sub-commands }}} | |
def _main(args, argv0): # {{{ | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--git', | |
type=str, default='git', | |
help='Override program for git') | |
parser.add_argument('--gpg', | |
type=str, default='', | |
help='Override program for gpg') | |
parser.add_argument('--oathtool', | |
type=str, default='', | |
help='Override program for oathtool') | |
parser.add_argument('--delete-always', | |
action='store_true', default=False, | |
help='Do not prompt before deleting unencrypted') | |
parser.add_argument('--totp-timethreshold-shownext', | |
type=int, default=_DEFAULT_TOTP_TIMETHRESHOLD_SHOWNEXT, | |
help='How long before time barrier we\'ll also show next TOTP [%(default)s]') | |
parser.add_argument('-s', '--skip-alts', | |
action='store_true', default=False, | |
help='Skip alternative sub-dirs') | |
parser.add_argument('-n', '--not-really', | |
action='store_true', default=False, | |
help='Do not encrypt or mutate git') | |
parser.add_argument('-v', '--verbose', | |
action='count', default=0, | |
help='Be more verbose') | |
#parser.add_argument('action', choices=AllCommands, help='action to take') | |
#parser.add_argument('action', choices=['encrypt'], help='action to take') | |
AllCommands.set_subparser(parser.add_subparsers()) | |
options = parser.parse_args(args=args) | |
if not hasattr(options, 'action'): | |
parser.print_help() | |
return 0 | |
global PROGNAME, _VERBOSE, _GIT_COMMAND, _GPG_COMMAND, _OATHTOOL_COMMAND | |
global _NOT_REALLY, _DELETE_ALWAYS | |
PROGNAME = argv0 | |
_VERBOSE = options.verbose | |
_GIT_COMMAND = options.git | |
_GPG_COMMAND = find_best_command('gpg', explicit=options.gpg, | |
candidates=_DESIRED_GPG_COMMANDS, prefer_paths=_DESIRED_GPG_PREFER_PATHS) | |
_OATHTOOL_COMMAND = find_best_command('oathtool', explicit=options.oathtool, | |
candidates=['oathtool',]) | |
_NOT_REALLY = options.not_really | |
_DELETE_ALWAYS = options.delete_always | |
os.umask(0o077) | |
if not have_gpg_agent(): | |
print('WARNING: missing a gpg-agent .', end='', flush=True) | |
SLEEP = 3 | |
for i in range(SLEEP): | |
time.sleep(1) | |
if i < (SLEEP - 1): | |
print('.', end='', flush=True) | |
else: | |
print(flush=True) | |
repo = PasswordsRepo() | |
AllCommands.repo = repo | |
AllCommands.options = options | |
return options.action() | |
# }}} | |
if __name__ == '__main__': | |
argv0 = sys.argv[0].rsplit('/')[-1] | |
try: | |
rv = _main(sys.argv[1:], argv0=argv0) | |
sys.exit(rv) | |
except ExitError as e: | |
print('{0}: {1}'.format(argv0, e), file=sys.stderr) | |
sys.exit(1) | |
# vim: set ft=python sw=2 expandtab foldmethod=marker : |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment