Skip to content

Instantly share code, notes, and snippets.

@philpennock philpennock/admin.py
Created Jan 28, 2018

Embed
What would you like to do?
Passwords/admin to manage the passwords repo. You'll need to update `DEFAULT_IDENTITY` at the very least.
#!/usr/bin/env python3
"""
admin: Passwords repo admin tool
Passwords repo has a bunch of actions which are common; copy/paste is
annoying. So manage the common actions.
"""
__author__ = 'phil.pennock@spodhuis.org (Phil Pennock)'
import argparse
import collections
import contextlib
import datetime
import fcntl
import fnmatch
import functools
import io
import json
import os
import re
import shlex
import shutil
import stat
import subprocess
import sys
import time
# Color support via colorama {{{
try:
import colorama
colorama.init()
def color(msg, styling):
result = []
for style in styling.split(','):
style = style.upper()
for m in (colorama.Fore, colorama.Style, colorama.Back):
if hasattr(m, style):
result.append(getattr(m, style))
break
result.append(msg)
result.append(colorama.Style.RESET_ALL)
return ''.join(result)
except ImportError:
def color(msg, styling):
return msg
# Color support via colorama }}}
EncryptionIdentity = collections.namedtuple('EncryptionIdentity', ['sign', 'encrypt'])
# Globals {{{
PROGNAME = 'admin'
_GIT_COMMAND = None
_GPG_COMMAND = None
_OATHTOOL_COMMAND = None
_NOT_REALLY = False
_DELETE_ALWAYS = False
_VERBOSE = 0
# subprocess module ignores post-process-start modifications to
# os.environ['PATH'] so since we already have a command selector, we can shove
# path preferences in there.
# Note: we prefer any of _DESIRED_GPG_COMMANDS in locations within _DESIRED_GPG_PREFER_PATHS
# to any of _DESIRED_GPG_COMMANDS, even earlier entries, within the normal PATH.
# So it's "desired installation areas" over "desired command names"
_DESIRED_GPG_PREFER_PATHS = [ '/opt/gnupg/bin' ]
#
_DESIRED_GPG_COMMANDS = [ 'gpg2.3', 'gpg2.2', 'gpg2.1', 'gpg2', 'gpg' ]
_ASSUME_GPGAGENT_SHOWS_IDENTITY = True
DEFAULT_IDENTITY = EncryptionIdentity(
# gpg -v --fingerprint --fingerprint $pgp_key_main
sign = '4E5C179E7FFC4DBF8E6C132F51104E668DD04481!', # no expiration
encrypt = '897C93EC1E5BA64B2E8ED0D2FBC6848A6FA1B909!', # expires: 2020-02-27
)
TOP_REMAP_IDENTITIES = {
}
# these relative to base:
SKIP_ENCRYPTION_FILES = set((
'admin',
'NOTES.txt',
'OTP.list.txt',
'template.totp',
))
# these apply only against basename
SKIP_ENCRYPTION_GLOB = set((
'*.noenc',
'*.counter',
'README.txt',
))
_DEFAULT_TOTP_TIMETHRESHOLD_SHOWNEXT = 8 # seconds
# Globals }}}
class Error(Exception):
"""Base class for exceptions from admin."""
pass
class ExitError(Error):
"""Error exception which should force exit without stack-trace."""
pass
class FindError(Error):
"""Problem finding an exact match for a given spec."""
pass
# Utility functions {{{
def verbose(msg, level=1):
if _VERBOSE >= level:
print(msg)
def show_error(msg):
print('{0}: {1}'.format(PROGNAME, msg), file=sys.stderr)
def encryption_command(identity):
sign_identity = identity
encrypt_identity = identity
if isinstance(identity, EncryptionIdentity):
sign_identity = identity.sign
encrypt_identity = identity.encrypt
# -aset : ascii sign encrypt text-mode
cmdline = [_GPG_COMMAND, '--for-your-eyes-only', '-aset']
if identity is not None:
cmdline.extend(['--no-default-recipient', '--no-encrypt-to'])
cmdline.extend(['-u', sign_identity, '-r', encrypt_identity])
return cmdline
def have_gpg_agent():
if 'GPG_AGENT_INFO' in os.environ:
return True
# gnupg 2.1 does automatic rendezvous
p = os.path.expanduser('~/.gnupg/S.gpg-agent')
try:
return stat.S_ISSOCK(os.stat(p).st_mode)
except Exception as e:
return False
def matches_skip_encryption(rel_filename):
if rel_filename in SKIP_ENCRYPTION_FILES:
return True
base = os.path.basename(rel_filename)
for g in SKIP_ENCRYPTION_GLOB:
if fnmatch.fnmatch(base, g):
return True
return False
def prompt_yesno(message):
r = input('{0} [y/N]? '.format(message))
if not r:
return False
if r == 'y' or r == 'Y':
return True
return False
def find_best_command(label, *, explicit, candidates, prefer_paths=None, _only_paths=None):
if prefer_paths:
try:
return find_best_command(label, explicit=explicit, candidates=candidates, _only_paths=prefer_paths)
except:
pass
with_paths = os.pathsep.join(_only_paths) if _only_paths is not None else os.environ['PATH']
if explicit:
p = shutil.which(explicit, path=with_paths)
if p:
return p
raise ExitError('specified {0} command {1!r} not found'.format(label, explicit))
for c in candidates:
p = shutil.which(c, path=with_paths)
if p:
return p
raise ExitError('unable to find a {0} command'.format(label,))
# Utility functions }}}
# Serialization support {{{
class ourjson_encoder(json.JSONEncoder):
def default(self, o):
if hasattr(o, 'serialize_attrs_to_keys'):
s = getattr(o, 'serialize_attrs_to_keys')
if callable(s):
s = s()
d = {}
for k in s:
if hasattr(o, k):
d[k] = getattr(o, k)
return d
return json.JSONEncoder.default(self, o)
def ourjson_dumps(*args, **kwargs):
return json.dumps(*args, cls=ourjson_encoder, **kwargs)
class SerializingObject(object):
def serialize_attrs_to_keys(self):
for k in dir(self):
if k.startswith('_'):
continue
if callable(getattr(self, k)):
continue
yield k
# Serialization support }}}
# Command registration {{{
class _AllCommands(object):
def __init__(self):
self.commands = {}
self.repo = None
self.options = None
self.subparser = None
def __getitem__(self, item):
if item in self.commands:
return self.commands[item]
else:
raise Error('No such command: {0}'.format(item))
def __contains__(self, item):
return item in self.commands
def set_subparser(self, subparser):
self.subparser = subparser
for cmd in sorted(self.commands.keys()):
self.commands[cmd].parser = subparser.add_parser(cmd, help=self.commands[cmd].help())
self.commands[cmd].parser.set_defaults(action=self.commands[cmd])
# Consider: register sub-commands from the command itself
self.commands[cmd].parser.add_argument('cmdline', nargs='*')
AllCommands = _AllCommands()
class Command(object):
def __init__(self, function, allcommands):
self.function = function
self.commands = allcommands
self.parser = None
for k in ('__name__', '__doc__', '__annotations__'):
try:
setattr(self, k, getattr(function, k))
except AttributeError:
pass
def __call__(self, *args, **kwargs):
return self.function(*args, repo=self.commands.repo, options=self.commands.options, **kwargs)
def help(self):
return self.function.__doc__
def command_register(wrapped):
AllCommands.commands[wrapped.__name__] = Command(wrapped, AllCommands)
# Command registration }}}
class PasswordRepoEntry(object): # {{{
def __init__(self, repo, basedir, filename, identity=None):
self.repo = repo
self.fullpath = os.path.join(basedir, filename)
self.is_encrypted = False
self.identity = identity
self.relpath = self.fullpath[len(self.repo.basedir)+1:]
self.rel_to_top_repo = self.fullpath[len(self.repo.identity_basedir)+1:]
root, ext = os.path.splitext(self.fullpath)
if ext and ext == '.asc':
self.is_encrypted = True
self.pairpath = root
self.should_be_encrypted = True
else:
self.pairpath = self.fullpath + '.asc'
self.should_be_encrypted = not matches_skip_encryption(self.relpath)
verbose(self, level=2)
def __str__(self):
return self.rel_to_top_repo
def stat(self):
if not hasattr(self, '_stat_me'):
self._stat_me = os.stat(self.fullpath)
return self._stat_me
def need_encrypt(self):
if self.is_encrypted:
return False
if not self.should_be_encrypted:
return False
try:
sb_pair = os.stat(self.pairpath)
except FileNotFoundError:
return True
if sb_pair.st_size == 0:
return True
sb_me = self.stat()
if not sb_me:
raise Error('missing stat of self for {0}'.format(self.relpath))
# go on basis that mtime equal is sufficiently unlikely that should re-encrypt for safety
if sb_pair.st_mtime > sb_me.st_mtime:
verbose('Have newer encrypted pair: {0}'.format(self.relpath))
return False
return True
def encrypt(self):
if self.is_encrypted:
raise Error('Already encrypted: {0}'.format(self.relpath))
if not self.should_be_encrypted:
raise Error('Should not be encrypted: {0}'.format(self.relpath))
cmd = encryption_command(self.identity)
cmd.append(self.fullpath)
verbose('Run: {0}'.format(' '.join(cmd)))
if not _NOT_REALLY:
subprocess.check_call(cmd)
self.git_add_pair()
def should_delete(self):
if self.is_encrypted:
return False
if not self.should_be_encrypted:
return False
if self.need_encrypt():
return False
return True
def Delete(self):
verbose('Delete: {0}'.format(self.relpath))
if _NOT_REALLY:
return
subprocess.call(["ls", "-ld", self.fullpath, self.pairpath])
if _DELETE_ALWAYS or prompt_yesno('delete {0}'.format(shlex.quote(self.relpath))):
os.unlink(self.fullpath)
else:
print('Not deleting {0}'.format(shlex.quote(self.relpath)))
def _pair_object(self):
return PasswordRepoEntry(self.repo, self.repo.basedir, os.path.relpath(self.pairpath, self.repo.basedir), identity=self.identity)
def git_add_pair(self):
self._pair_object().git_add()
def git_add(self):
# problem: we currently walk from the top-level repo always, so we don't have the correct git basedir
# we want to git -C repo add relpath-for-that-repo
self.repo.git("add", self.relpath)
def fix_permissions(self):
if matches_skip_encryption(self.relpath):
return
try:
s = os.stat(self.fullpath)
except OSError as e:
show_error(e)
return
if not ( stat.S_ISREG(s.st_mode) or stat.S_ISDIR(s.st_mode) ):
return
if s.st_mode & 0o077 != 0:
verbose('fixing permissions on: {0}'.format(self.relpath))
try:
os.chmod(self.fullpath, s.st_mode & 0o700, follow_symlinks=False)
except NotImplementedError:
os.chmod(self.fullpath, s.st_mode & 0o700)
# }}}
class OTP(SerializingObject): # {{{
def __init__(self, *, filename, directory, basedir, repo):
self.oathtype = 'HOTP'
self.name = filename[:len(filename)-len('.counter')]
self.filename = filename
self.directory = directory
self.basedir = basedir
self.repo = repo
self.fullpath = os.path.join(self.directory, self.filename)
self.otprelpath = self.fullpath[len(self.basedir)+1:]
self.reporelpath = self.fullpath[len(self.repo.basedir)+1:]
def find_related(self, *, filenames):
if self.name in filenames:
self.unencrypted = self.name
if (self.name + '.asc') in filenames:
self.encrypted = self.name + '.asc'
@contextlib.contextmanager
def locked(self):
with open(self.fullpath, 'r+') as fd:
fcntl.lockf(fd, fcntl.LOCK_EX)
self.counter_fd = fd
self.current_counter = int(fd.read().strip())
fd.seek(0, 0)
yield
delattr(self, 'counter_fd')
def oath_cfg_reader(self):
if hasattr(self, 'unencrypted'):
return open(os.path.join(self.directory, self.unencrypted))
if hasattr(self, 'encrypted'):
cmd = [_GPG_COMMAND, '-d', os.path.join(self.directory, self.encrypted)]
if _VERBOSE:
return io.StringIO(subprocess.check_output(cmd).decode('UTF-8'))
return io.StringIO(subprocess.check_output(cmd, stderr=subprocess.DEVNULL).decode('UTF-8'))
raise Error('bad {0.oathtype} setup for {0.name}'.format(self))
def print_and_increment(self, *args, options, **kwargs):
with self.locked():
oath_config = json.load(self.oath_cfg_reader())
verbose('{0.oathtype} {0.name!r} at counter: {0.current_counter}'.format(self))
if 'hotp' not in oath_config:
raise ExitError('{0.oathtype} {0.name!r} oath data missing key "hotp"'.format(self))
cmd = [_OATHTOOL_COMMAND]
if _VERBOSE >= 7:
# WARNING: This will expose the secret to stdio!
cmd.append('-v')
cmd.append('--' + self.oathtype.lower()) # we will extend to TOTP later?
cmd.append('--counter={0:d}'.format(self.current_counter))
if 'digits' in oath_config:
cmd.append('--digits={0:d}'.format(oath_config['digits']))
cmd.append(oath_config['hotp'])
subprocess.check_call(cmd)
if not _NOT_REALLY:
print(self.current_counter + 1, file=self.counter_fd)
# Don't add/commit until unlocked and counter written to disk
self.repo.git('commit', '-m', 'HOTP token usage', '--', self.reporelpath)
# }}}
class OTPCollection(object): # {{{
def __init__(self, repo):
self.repo = repo
self.otpdir = os.path.join(repo.basedir, 'otp')
if not os.path.isdir(self.otpdir):
raise ExitError('Missing OTP directory: {0}'.format(self.otpdir))
self.known = None
if _OATHTOOL_COMMAND is None:
show_error('BEWARE: no oathtool command known')
def _force_populate(self):
self.known = []
for (dirpath, dirnames, filenames) in os.walk(self.otpdir, topdown=True):
for f in filenames:
if not f.endswith('.counter'):
continue
hotp = OTP(filename=f, directory=dirpath, basedir=self.otpdir, repo=self.repo)
hotp.find_related(filenames=filenames)
self.known.append(hotp)
if not self.known:
self.known = None
return
self.known_names = set((i.name.lower() for i in self.known))
self.known_by_name = dict((i.name.lower(), i) for i in self.known)
def _auto_populate(self):
if self.known is None:
self._force_populate()
if self.known is None:
raise Error('unable to find any HOTP counter files')
def cmd_list(self, *args, **kwargs):
self._auto_populate()
for item in self.known:
print(item.name)
def cmd_with(self, *args, options, **kwargs):
if _OATHTOOL_COMMAND is None:
raise Error('no oathtool command known')
try:
name = options.cmdline[1]
except IndexError:
raise ExitError('need a OTP name to look at (try "list" cmd)') from None
self._auto_populate()
lname = name.lower()
if lname not in self.known_names:
candidates = []
for n in self.known_names:
if lname in n:
candidates.append(n)
if not candidates:
raise ExitError('unknown OTP token name {0!r}'.format(name,))
if len(candidates) > 1:
raise ExitError('ambiguous OTP token name {0!r}, could be: {1!r}'.format(name, candidates))
lname = candidates[0]
token = self.known_by_name[lname]
#print(ourjson_dumps(token))
# TODO: just "count" or should we have more options?
token.print_and_increment(*args, options=options, **kwargs)
def has_named(self, name):
self._auto_populate()
return name.lower() in self.known_names
# }}}
class PasswordsRepo(object): # {{{
def __init__(self, dir=None, identity_basedir=None):
if dir is None:
self.basedir = self._find_basedir()
else:
# don't check for candidacy, is a sub-dir, doesn't need its own 'admin' etc?
self.basedir = dir
self.identity_basedir = identity_basedir if identity_basedir is not None else self.basedir
self._sub_repos = {}
def _valid_dir_candidate(self, dirname):
return os.path.exists(os.path.join(dirname, 'admin')) and os.path.isdir(os.path.join(dirname, '.git'))
def _find_basedir(self):
for d_gen in (
lambda : sys.argv[0].rsplit('/', 1)[0],
lambda : os.path.expanduser('~/Passwords'),
):
d = d_gen()
if d and self._valid_dir_candidate(d):
return os.path.abspath(d)
verbose('Skipping invalid repo basedir: {0}'.format(d))
raise Error('no Passwords repo found')
def sub_repo(self, abspath):
if abspath not in self._sub_repos:
self._sub_repos[abspath] = PasswordsRepo(abspath, self.identity_basedir)
return self._sub_repos[abspath]
def iter_files(self, skip_alts=False):
alts = set()
altmap = {}
# This identity logic was written when we just had one PasswordsRepo logic, before
# I realized that for git, I should have multiple; it works still, thanks to the
# addition of identity_basedir to use instead of basedir but perhaps there's a cleaner
# approach to consider in future.
# Not sure if identity is orthogoal to repository, or if should enforce that.
if not skip_alts:
for k in TOP_REMAP_IDENTITIES:
d = os.path.join(self.identity_basedir, k)
alts.add(d)
altmap[d] = TOP_REMAP_IDENTITIES[k]
# these dirnames.remove() calls are inefficient but for the size dirs we're
# talking about, we should be able to just live with it.
for (dirpath, dirnames, filenames) in os.walk(self.basedir, topdown=True):
if '.git' in dirnames:
dirnames.remove('.git')
if dirpath == self.basedir:
verbose('Walk found our own git repo at: {0}'.format(dirpath))
else:
verbose('Walk found DIFFERENT git repo at: {0}'.format(dirpath))
r2 = self.sub_repo(dirpath)
for e in r2.iter_files(skip_alts):
yield e
dirnames.clear()
verbose('Walk back from sub-repo, now in: {0}'.format(self.basedir))
continue
identity = DEFAULT_IDENTITY
if dirpath in alts:
identity = altmap[dirpath]
if skip_alts and dirpath == self.basedir:
for d in TOP_REMAP_IDENTITIES:
if d in dirnames:
dirnames.remove(d)
for f in filenames:
if f.startswith('.'):
continue
p = os.path.join(dirpath, f)
if os.path.islink(p):
try:
os.stat(p)
except:
verbose('warning: dead symlink chain starting at: {0!r}'.format(os.path.relpath(p, self.identity_basedir)), level=0)
continue
yield PasswordRepoEntry(self, dirpath, f, identity=identity)
def walk_repos(self):
"""depth-first repo walk, so that any submodules can be updated before the parent."""
for r in self._sub_repos.values():
for e in r.walk_repos():
yield e
yield self
def git(self, *args, errexit_okay=False):
cmd = [_GIT_COMMAND, "-C", self.basedir]
cmd.extend(args)
verbose('Run: {0}'.format(' '.join(cmd)))
if _NOT_REALLY:
return
if errexit_okay:
rc = subprocess.call(cmd)
if rc != 0:
show_error('command failed: {}'.format(rc))
else:
subprocess.check_call(cmd)
def decrypt_reader(self, *, find_spec, options):
candidates = []
if find_spec.startswith(os.path.curdir + os.path.sep): # assume tab-completed filename
full = os.path.abspath(find_spec)
pf = self.entry_by_abspath(full)
if not pf:
raise FindError('no matches found for explicit path {}'.format(find_spec))
candidates.append(pf)
else:
needle = find_spec.lower()
for pf in self.iter_files(skip_alts=options.skip_alts):
item = pf.relpath.lower()
if needle in item:
candidates.append(pf)
del pf
if len(candidates) == 0:
raise FindError('no matches found for {}'.format(find_spec))
if len(candidates) > 1:
candidates = self.dedup_entries(candidates)
if len(candidates) == 0:
raise Error('lost all candidates in dedup')
if len(candidates) > 1:
e = FindError('multiple matches found for {}'.format(find_spec))
setattr(e, 'candidates', candidates)
raise e
fn = candidates[0].rel_to_top_repo
if not fn.endswith('.asc'):
raise FindError('not a .asc file for {}: {}'.format(find_spec, fn))
if _VERBOSE:
print('match: {} -> {}'.format(find_spec, fn), file=sys.stderr)
cmd = [_GPG_COMMAND, '-d', os.path.join(self.basedir, fn)]
if _VERBOSE:
return io.StringIO(subprocess.check_output(cmd).decode('UTF-8')), fn
return io.StringIO(subprocess.check_output(cmd, stderr=subprocess.DEVNULL).decode('UTF-8')), fn
def dedup_entries(self, entry_list):
seen = {}
results = []
# no guarantee if we return the symlink or the original
for pf in entry_list:
key = str(pf.stat().st_dev) + ':' + str(pf.stat().st_ino)
if key in seen:
continue
seen[key] = pf
results.append(pf)
return results
def entry_by_abspath(self, fullpath):
# assumption: sep is root, which is Unix, but bad assumption for portability?
# what's _correct_ here?
common = os.path.commonprefix([fullpath, self.basedir])
if common == os.path.sep:
return None
return self.entry_by_relpath(os.path.relpath(fullpath, self.basedir))
def entry_by_relpath(self, relative):
for k in sorted(self._sub_repos.keys(), key=len, reverse=True):
rel_to_sub = os.path.relpath(relative, k)
if rel_to_sub.startswith(os.path.pardir + os.path.sep):
continue
return self._sub_repos[k].entry_by_relpath(rel_to_sub)
if not os.path.isfile(os.path.join(self.identity_basedir, relative)):
return None
# FIXME: remapped identities in here currently unused, not going to try and write/debug this without that,
# so this will fail for sub-repos with their own PGP identities.
return PasswordRepoEntry(self, os.path.join(self.identity_basedir, os.path.dirname(relative)), os.path.basename(relative), DEFAULT_IDENTITY)
# EWW what a _horrid_ API.
# }}}
# Top-level program sub-commands {{{
@command_register
def encrypt(*args, repo, options, **kwargs):
'encrypt files, clean up and prep for commit'
verbose('Base: {0}'.format(repo.basedir))
# We give the user a chance to know which id they might need a passphrase for.
# We rely heavily upon gpg-agent caching so that we only show this once per identity.
seen_ids = {}
def maybe_prompt_id(repofile):
if _ASSUME_GPGAGENT_SHOWS_IDENTITY:
return
if repofile.identity in seen_ids:
return
seen_ids[repofile.identity] = True
dname = shlex.quote(str(repofile))
if repofile.identity is None:
input('If prompted for passphrase, is for default PGP key (file {0}) [enter]'.format(dname))
else:
input('If prompted for passphrase, is for {0} PGP key (file {1}) [enter]'.format(shlex.quote(repofile.identity), dname))
for pf in repo.iter_files(skip_alts=options.skip_alts):
#print('encrypted {0.is_encrypted}/{0.should_be_encrypted}\t{0.relpath}'.format(pf))
pf.fix_permissions()
if pf.need_encrypt():
maybe_prompt_id(pf)
pf.encrypt()
if pf.should_delete():
pf.Delete()
# FIXME: handle being mutated, when updating git
return AllCommands['status'](*args, **kwargs)
@command_register
def clean(*args, repo, options, **kwargs):
'clean up non-encrypted files'
verbose('Base: {0}'.format(repo.basedir))
for pf in repo.iter_files(skip_alts=options.skip_alts):
if pf.is_encrypted:
continue
if not pf.should_be_encrypted:
verbose('Skipping file which is expected to be unencrypted: {0.relpath}'.format(pf), level=2)
#print('{0} {1}'.format(color('Needs to be encrypted:', 'cyan,bright'), color(pf.relpath, 'cyan')))
continue
if pf.need_encrypt():
print('{0} {1}'.format(color('Needs to be encrypted:', 'cyan,bright'), color(pf.relpath, 'cyan')))
continue
if not pf.should_delete():
print('{0}: {1.relpath}'.format('Unknown file status', 'red,bright'), color(pf))
continue
print('{0} {1}'.format(color('Candidate for deletion:', 'yellow,bright'), color(pf.relpath, 'yellow')))
pf.Delete()
@command_register
def noop(*args, repo, options, **kwargs):
'do nothing except say nada'
print('nada')
return 0
def _git_in_each(repo, options, cmd, errexit_okay=False):
# populate the sub-repos; FIXME: remember why this is needed
if not cmd:
raise Error('BUG: missing git\'s subcmd')
for pf in repo.iter_files(skip_alts=options.skip_alts):
pass
_NOT_REALLY = False
first = True
for r in repo.walk_repos():
if first:
first = False
else:
print()
print('{0}: {1}'.format(color('In repo', 'cyan,bright'), color(r.basedir, 'cyan')))
r.git(*cmd, errexit_okay=errexit_okay)
@command_register
def status(*args, repo, options, **kwargs):
'show status of repo and sub-repos'
_git_in_each(repo, options, 'status --no-short'.split())
return AllCommands['need'](*args,
show_banner=color('\nFiles which need to be encrypted:', 'cyan,bright'),
**kwargs)
@command_register
def commit(*args, repo, options, **kwargs):
'git commit each repo and sub-repo'
_git_in_each(repo, options, ['commit']+options.cmdline, errexit_okay=True)
@command_register
def push(*args, repo, options, **kwargs):
'git push each repo and sub-repo'
_git_in_each(repo, options, ['push', 'all']+options.cmdline)
# The push/all hack leaves references for the others stale
_git_in_each(repo, options, ['fetch', '--all'])
@command_register
def need(*args, repo, options, show_banner='', **kwargs):
'show files which need to be encrypted'
verbose('Base: {0}'.format(repo.basedir))
shown = False
def show():
nonlocal shown
if shown or not show_banner:
return
shown=True
print(show_banner)
for pf in repo.iter_files(skip_alts=options.skip_alts):
if pf.need_encrypt():
show()
print(pf)
return 0
@command_register
def find(*args, repo, options, **kwargs):
'show files containing search key as substring'
seek = set(a.lower() for a in options.cmdline)
for pf in repo.iter_files(skip_alts=options.skip_alts):
item = pf.relpath.lower()
for s in seek:
if s in item:
print(pf)
break
return 0
@command_register
def recent(*args, repo, options, **kwargs):
'list N most recent files'
time_based = [(pf.stat().st_mtime, str(pf)) for pf in repo.iter_files(skip_alts=options.skip_alts)]
time_based.sort(reverse=True)
limit = 10
if options.cmdline:
try:
limit = int(options.cmdline[0])
except:
pass
time_based = time_based[:limit]
time_based = [(t[0], t[1], datetime.datetime.fromtimestamp(t[0])) for t in time_based]
namewidth = len(max(time_based, default=('', ' '), key=lambda p: len(p[1]))[1])
format_ = '{name:<{namewidth}s} : {dttime:%Y-%m-%d %H:%M:%S}'
for (mtime, name, dttime) in time_based:
print(format_.format(time=time, name=name, dttime=dttime, namewidth=namewidth))
@command_register
def otp(*args, repo, options, **kwargs):
'perform OTP commands (HOTP generations)'
# We wrap the OTP collection, with named sub-commands (should redo this using argparse parsers on subparsers?)
# We handle the command being defined to exist by the OTPCollection class.
# No command => 'list'
# If command not known, but token named this does exist, then command => 'with' (implicit)
if not options.cmdline:
options.cmdline = ['list']
cmd = 'cmd_' + options.cmdline[0].lower()
otp_collection = OTPCollection(repo)
if not hasattr(otp_collection, cmd):
if otp_collection.has_named(options.cmdline[0]):
cmd = 'cmd_with'
options.cmdline = ['with'] + options.cmdline
if hasattr(otp_collection, cmd):
return getattr(otp_collection, cmd)(options.cmdline[1:], *args, options=options, **kwargs)
raise ExitError('unknown OTP command {!r}'.format(options.cmdline[0],))
def _each_file(args, repo, options, kwargs):
if not options.cmdline:
raise ExitError('need something to show')
errored = 0
for a in options.cmdline:
try:
out, fn = repo.decrypt_reader(find_spec=a, options=options)
yield out, fn
except FindError as e:
print(e, file=sys.stderr)
if hasattr(e, 'candidates'):
for c in e.candidates:
print(' -> {}'.format(c), file=sys.stderr)
print('', file=sys.stderr, flush=True)
errored += 1
if errored:
raise ExitError('failed: {}'.format(errored))
@command_register
def show(*args, repo, options, **kwargs):
for out, fn in _each_file(args, repo, options, kwargs):
print('File: {}'.format(fn), file=sys.stderr)
while True:
chunk = out.read(8192)
if chunk:
print(chunk, end='')
else:
print('', flush=True)
out.close()
break
@command_register
def totp(*args, repo, options, **kwargs):
if _OATHTOOL_COMMAND is None:
raise ExitError('no oathtool command known')
errored = False
re_totp2fa = re.compile('^\s*TOTP2FA=[\'"]([^\'"]+)[\'"]')
re_url = re.compile(r'[?&]secret=([^&\s]+)')
re_nextline = re.compile(r'\bTOTP\b\s+.*\bsecret:\s*$')
for out, fn in _each_file(args, repo, options, kwargs):
totp2fa = None
otpauth_url = None
totp_whole = None
totp_whole_next = False
# next two currently constant, could extract from url?
digits = 6
step_duration = 30
token = None
for line in out:
if totp_whole_next:
totp_whole = line
totp_whole_next = False
continue
if 'TOTP2FA=' in line:
totp2fa = line
break
if 'otpauth:' in line and otpauth_url is None:
otpauth_url = line
if re_nextline.search(line):
totp_whole_next = True
out.close()
if totp2fa is not None:
m = re_totp2fa.match(totp2fa)
elif otpauth_url is not None:
m = re_url.search(otpauth_url)
elif totp_whole is not None:
token = totp_whole.strip()
else:
print('No token found in {}'.format(fn), file=sys.stderr)
errored = True
continue
if token is None:
if not m:
print('Failed to parse token line found in {}'.format(fn), file=sys.stderr)
errored = True
continue
token = m.group(1)
cmd = [_OATHTOOL_COMMAND, '--totp', '--base32',
'--digits={0:d}'.format(digits),
'--time-step-size={0:d}s'.format(step_duration)]
if _VERBOSE >= 7:
# WARNING: This will expose the secret to stdio!
cmd.append('-v')
cmd += ['--', token]
subprocess.check_call(cmd)
remaining = step_duration - (int(time.time()) % step_duration)
print('TOTP time remaining: {} seconds'.format(remaining), file=sys.stderr)
if remaining <= options.totp_timethreshold_shownext:
t = int(time.time()) + remaining + 1
reftime = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(t))
cmd = [_OATHTOOL_COMMAND, '--now=' + reftime] + cmd[1:]
print('Next TOTP value [at time: {}]'.format(reftime), file=sys.stderr)
subprocess.check_call(cmd, stdout=sys.stderr) # only the _current_ value to stdout, for $() capture
return 1 if errored else 0
@command_register
def info(*args, repo, options, **kwargs):
'report general information'
print('Program {PROGNAME}, verbose {_VERBOSE}:\ngit: {_GIT_COMMAND}\ngpg: {_GPG_COMMAND}\noathtool: {_OATHTOOL_COMMAND}'.format(
**globals()))
# Top-level program sub-commands }}}
def _main(args, argv0): # {{{
parser = argparse.ArgumentParser()
parser.add_argument('--git',
type=str, default='git',
help='Override program for git')
parser.add_argument('--gpg',
type=str, default='',
help='Override program for gpg')
parser.add_argument('--oathtool',
type=str, default='',
help='Override program for oathtool')
parser.add_argument('--delete-always',
action='store_true', default=False,
help='Do not prompt before deleting unencrypted')
parser.add_argument('--totp-timethreshold-shownext',
type=int, default=_DEFAULT_TOTP_TIMETHRESHOLD_SHOWNEXT,
help='How long before time barrier we\'ll also show next TOTP [%(default)s]')
parser.add_argument('-s', '--skip-alts',
action='store_true', default=False,
help='Skip alternative sub-dirs')
parser.add_argument('-n', '--not-really',
action='store_true', default=False,
help='Do not encrypt or mutate git')
parser.add_argument('-v', '--verbose',
action='count', default=0,
help='Be more verbose')
#parser.add_argument('action', choices=AllCommands, help='action to take')
#parser.add_argument('action', choices=['encrypt'], help='action to take')
AllCommands.set_subparser(parser.add_subparsers())
options = parser.parse_args(args=args)
if not hasattr(options, 'action'):
parser.print_help()
return 0
global PROGNAME, _VERBOSE, _GIT_COMMAND, _GPG_COMMAND, _OATHTOOL_COMMAND
global _NOT_REALLY, _DELETE_ALWAYS
PROGNAME = argv0
_VERBOSE = options.verbose
_GIT_COMMAND = options.git
_GPG_COMMAND = find_best_command('gpg', explicit=options.gpg,
candidates=_DESIRED_GPG_COMMANDS, prefer_paths=_DESIRED_GPG_PREFER_PATHS)
_OATHTOOL_COMMAND = find_best_command('oathtool', explicit=options.oathtool,
candidates=['oathtool',])
_NOT_REALLY = options.not_really
_DELETE_ALWAYS = options.delete_always
os.umask(0o077)
if not have_gpg_agent():
print('WARNING: missing a gpg-agent .', end='', flush=True)
SLEEP = 3
for i in range(SLEEP):
time.sleep(1)
if i < (SLEEP - 1):
print('.', end='', flush=True)
else:
print(flush=True)
repo = PasswordsRepo()
AllCommands.repo = repo
AllCommands.options = options
return options.action()
# }}}
if __name__ == '__main__':
argv0 = sys.argv[0].rsplit('/')[-1]
try:
rv = _main(sys.argv[1:], argv0=argv0)
sys.exit(rv)
except ExitError as e:
print('{0}: {1}'.format(argv0, e), file=sys.stderr)
sys.exit(1)
# vim: set ft=python sw=2 expandtab foldmethod=marker :
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.