Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A python pinentry wrapper to query your keyring for GPG passphrases, inspired from kwalletcli.
#!/usr/bin/python3
#
# Copyright © 2020 Cimbali <me@cimba.li>
# Iniital concept from mirabilos <m@mirbsd.org> at https://www.mirbsd.org/kwalletcli.htm
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
'''This script wraps a pinentry executable to ask your keyring about the password before prompting you.
All further interactions are forwarded to the pinentry process running in the back.
'''
import os
import sys
import termios
import traceback
import subprocess
import keyring
# configured by command line
pinentry_bin = os.environ.get('PINENTRY', 'pinentry') # which backend pinentry to use
fulltty = True # whether to prompt unknown passwords via TTY rather than backend pinentry
# configured by pinentry interactions
params = {} # prompt, desc, error, keyinfo
options = {} # ttyname, etc.
# reset environment
os.environ.pop('LANGUAGE', None)
for env in ['ALL', 'NUMERIC ', 'TIME ', 'COLLATE ', 'MONETARY ', 'MESSAGES ', 'PAPER ', 'NAME ', 'ADDRESS ', 'TELEPHONE ', 'MEASUREMENT ', 'IDENTIFICATION']:
os.environ.pop(f'LC_{env}', None)
os.environ['LANG']='C'
os.environ['LC_CTYPE']='en_US.UTF-8'
# some utilities
logfiles = {
'error': os.path.join(os.environ.get('GPGHOME', os.path.expanduser('~/.gnupg')), 'pinentry-keyring.log'),
'debug': os.devnull
}
def log(dest, *args, **kwargs):
print(*args, **kwargs, file = logfiles[dest], flush=True)
class Coprocess:
''' Handles the backend pinentry process
'''
def __init__(self, args):
os.environ['PINENTRY_KEYRING'] = 'set'
os.environ.pop('PINENTRY_BINARY', None)
try:
self.process = subprocess.Popen([pinentry_bin, *args], stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0, universal_newlines=True)
message, *greeting = self.get_line()
except:
message = None
finally:
if message != 'OK' and self.process:
self.process.kill()
self.process = None
def __del__(self):
if self.process:
self.process.kill()
self.process = None
def get_line(self):
pair = self.process.stdout.readline().rstrip('\n').split(None, 1)
log('debug', 'pinentry > ' + ' '.join(pair))
return pair if len(pair) == 2 else (pair[0], '')
def send(self, data):
if not self.process:
return ['ERR 14 no coproc']
log('debug', 'pinentry < ' + data)
print(data, file=self.process.stdin, flush=True)
word, rem = self.get_line()
reply = [(word, rem)]
while word not in {'ERR', 'OK'}:
word, rem = self.get_line()
reply.append((word, rem))
return reply
def get_pw(self, req):
reply = self.send('CONFIRM' if req == 'bool' else 'GETPIN')
pw = None
if reply[0][0] == 'D' and reply[1][0] == 'OK':
pw = reply[0][1]
return (pw, [' '.join(l) for l in reply])
class NoShowInput(object):
''' Context Manager that sets a TTY’s attributes (lfalgs) to not echo, and restores afterwards
'''
def __init__(self, fd):
self.fd = fd
self.attr = termios.tcgetattr(self.fd)
def __enter__(self):
attr = self.attr[:]
attr[3] = attr[3] & ~termios.ECHO
termios.tcsetattr(self.fd, termios.TCSADRAIN, attr)
return self
def __exit__(self, type, value, traceback):
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.attr)
class GPGTTY(object):
''' Context Manager that allows reading and writing to the TTY specified by GPG
'''
def __enter__():
self.ttyout = open(options['ttyname'], 'w')
self.ttyin = open(options['ttyname'], 'r')
return self
def __exit__(self, type, value, traceback):
self.ttyout.close()
self.ttyin.close()
def write(*args, **kwargs):
print(*args, **kwargs, file=self.ttyout, flush=True)
def read(*args, **kwargs):
read = self.ttyin.readline()
if not read: raise EOFError
assert read.endswith('\n')
return read[:-1]
def get_pw(self, req):
if 'error' in params:
self.write('\033[91m' + params['error'] + '\033[0m')
if 'desc' in params:
self.write(params['desc'].replace('%22', '"').replace('%0A', '\n'))
self.write(params.get('prompt', options.get('default-prompt', 'PIN:')), end=' ')
try:
with NoShowInput(self.ttyin.fileno()):
passphrase = self.read()
except EOFError:
return (False, ['ERR 83886179 Operation cancelled <Pinentry>'])
else:
return (passphrase, [f'D {passphrase}', 'OK'])
finally:
self.write()
class Keyring:
''' Wraps interactions with the keyring module to get and store the password
'''
@staticmethod
def get_pw(req):
try:
pw = keyring.get_password('pinentry-keyring', params['keyinfo'])
except keyring.errors.KeyringError:
log('error', 'ERROR Unhandled exception: ' + repr(e))
traceback.print_exc(file=logfiles['debug'])
with GPGTTY() as tty:
tty.write(f'{os.path.basename(sys.argv[0])}: error looking up password in keyring: {e}', end='\n\n')
return (None, [f'ERR 128 keyring error {e}'])
else:
if pw is None:
return (None, ['ERR 128 password not in keyring'])
elif req == 'pass':
return (pw, [f'D {pw}', 'OK'])
else:
return (True, ['OK'])
@staticmethod
def save_pw(pw):
saving = False
with GPGTTY() as tty:
tty.write(f'{os.path.basename(sys.argv[0])}: {options.get("default-pwmngr", "Save pin to keyring").replace("_", "")}? [y/N] ', end='')
reply = tty.read()
saving = reply.strip() and reply.strip()[0] in {'y', 'Y'}
tty.write(f'{os.path.basename(sys.argv[0])}: {"saving..." if saving else "OK, skipping"}', end = ' ' if saving else '\n')
if not saving:
return
try:
keyring.set_password('pinentry-keyring', params['keyinfo'], pw)
except keyring.errors.KeyringError as e:
log('error', 'ERROR Unhandled exception: ' + repr(e))
traceback.print_exc(file=logfiles['debug'])
with GPGTTY() as tty:
tty.write(f'error: {e}', file=tty)
else:
with GPGTTY() as tty:
tty.write('done.', file=tty)
def getpass(proc):
''' The crux of it all: try our options to get a password or phrase.
If it’s not in the keyring, save it to the keyring.
'''
if options.get('allow-external-password-cache', False):
pw, reply = Keyring.get_pw('pass')
else:
log('debug', 'skipping keyring: external cache disallowed')
with GPGTTY() as tty:
tty.write(f'{os.path.basename(sys.argv[0])}: skipping keyring: external cache disallowed')
pw, reply = (None, ['ERR 1 no external password cache'])
if not pw:
if not proc or fulltty:
with GPGTTY() as tty:
pw, reply = tty.get_pw('pass')
else:
pw, reply = proc.get_pw('pass', proc)
if pw and options.get('allow-external-password-cache', False):
Keyring.save_pw(pw)
del pw # no hanging refs − best security python can afford?
return reply
def handle_command(action, arg, proc):
''' Handle the commands for the subset of the Assuan that interests us
'''
if action.startswith('SET'):
params[action[3:].lower()] = arg
elif action == 'GETPIN':
return getpass(proc)
elif action == 'CONFIRM':
pw, reply = Keyring.get_pw('bool')
#if not pw and (not proc or fulltty):
#with GPGTTY() as tty:
# pw, reply = tty.get_pw('bool')
if not pw:
pw, reply = proc.get_pw('bool', proc)
del pw # no hanging refs − best security python can afford?
return reply
elif action == 'BYE':
proc.send(f'{action} {arg}')
return ['OK Closing connection']
elif action == 'OPTION':
try:
opt, val = arg.split('=', 1)
except ValueError:
opt, val = arg, True
if opt == 'ttyname': os.environ['GPG_TTY'] = val
if opt == 'ttytype': os.environ['GPG_TERM'] = val
if opt == 'lc-ctype': os.environ['LC_CTYPE'] = val
if opt == 'lc-messages': os.environ['LC_MESSAGES'] = val
options[opt] = val
elif action == 'GETINFO' and arg == 'pid':
return [f'D {os.getpid()}', 'OK']
elif action == 'GETINFO' and arg == 'ttyinfo':
return [f'D {os.environ["GPG_TTY"]} {os.environ.get("GPG_TERM", "")} {os.environ["DISPLAY"]}', 'OK']
elif action == 'GETINFO' and arg == 'flavor':
return [f'D keyring', 'OK']
elif action == 'GETINFO' and arg == 'version':
if proc:
return [' '.join(l) for l in proc.send(f'{action} {arg}')]
else:
return ['D 1.1.0', 'OK'] # the version we emulate (and thus implement?)
else:
log('error', f'warning: unknown line {action} {arg}')
return ['ERR 1 unrecognised command']
# In general, just pass it on and forget about the reply
proc.send(f'{action} {arg}')
return ['OK']
def handle_args(args):
''' Parse the command line arguments and remove those not destined for pinentry
'''
iter_args = iter(args[:])
for arg in iter_args:
if arg == '--display':
os.environ['DISPLAY'] = next(iter_args)
elif arg == '--ttyname':
os.environ['GPG_TTY'] = next(iter_args)
elif arg == '--ttytype':
os.environ['GPG_TERM'] = next(iter_args)
elif arg == '--lc-type':
os.environ['LC_CTYPE'] = next(iter_args)
elif arg == '--lc-messages':
os.environ['LC_MESSAGES'] = next(iter_args)
elif arg in {'-t', '--tty', '-n', '--no-tty'}:
fulltty = 'n' not in arg
args.remove(arg)
elif arg in {'-p', '--pinentry'}:
pinentry_bin = next(iter_args)
args.remove(arg)
args.remove(pinentry_bin)
elif arg in {'-v', '--verbose'}:
logfiles['debug'] = logfiles['error']
args.remove(arg)
elif arg in {'-h', '--help'}:
print(__doc__)
print(f'Usage: {os.path.basename(sys.argv[0])} [options]')
print('-p --pinentry BINARY Use BINARY as the backend pinentry program')
print('-t --tty -n --no-tty Prefer the TTY to prompt for the password')
print(f'-v --verbose Wrote more information to {logfiles["error"]}')
print('Further options are passed directly to pinentry')
exit(0)
elif arg in {'-d', '-e', '-g', '--debug', '--enhanced', '--no-global-grab'}:
pass
elif arg in {'-W', '--parent-wid'}:
next(iter_args)
else:
print(f'Warning: unknown argument {arg}')
return args
def main(args):
''' Main loop: while we receive commands on stdin reply to them as best we can.
'''
# open log files (after argument handling)
with open(logfiles['error'], 'w') as logfiles['error'], open(logfiles['debug'], 'w') as logfiles['debug']:
# Any reasons why we should not start?
if os.environ.get('PINENTRY_KEYRING', None):
log('error', 'recursive call')
print('ERR 7 trying to call me recursively', flush=True)
exit(7)
if not os.environ.get('DISPLAY', None):
log('error', f'since DISPLAY is not set, replacing with: {pinentry_bin}')
exit(subprocess.run([pinentry_bin, args], stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr).returncode)
# Go ahead: start the pinentry process and handle the commands
proc = Coprocess(args)
print('OK ready', flush=True)
for line in (l.rstrip('\n') for l in sys.stdin):
if not line or line.startswith('#'):
continue
pair = line.split(None, 1)
action, arg = (pair if len(pair) == 2 else (pair[0], ''))
try:
reply = '\n'.join(handle_command(action, arg, proc))
print(reply, flush=True)
except BrokenPipeError:
break
except Exception as e:
log('error', 'ERROR Unhandled exception: ' + repr(e))
traceback.print_exc(file=logfiles['debug'])
try:
print('ERR 8 unhandled exception', flush=True)
except BrokenPipeError:
break
finally:
if action == 'BYE':
break
if __name__ == '__main__'or True:
main(handle_args(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment