Skip to content

Instantly share code, notes, and snippets.

@Cimbali
Created April 9, 2020 15:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Cimbali/862a430a0f28ffe07f8ae618e8b73973 to your computer and use it in GitHub Desktop.
Save Cimbali/862a430a0f28ffe07f8ae618e8b73973 to your computer and use it in GitHub Desktop.
A python pinentry wrapper to query your keyring for GPG passphrases, inspired from kwalletcli.
#!/usr/bin/python3
#
# Copyright © 2020 Cimbali <me@cimba.li>
# Iniital concept from mirabilos <m@mirbsd.org> at https://www.mirbsd.org/kwalletcli.htm
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
'''This script wraps a pinentry executable to ask your keyring about the password before prompting you.
All further interactions are forwarded to the pinentry process running in the back.
'''
import os
import sys
import termios
import traceback
import subprocess
import keyring
# configured by command line
pinentry_bin = os.environ.get('PINENTRY', 'pinentry') # which backend pinentry to use
fulltty = True # whether to prompt unknown passwords via TTY rather than backend pinentry
# configured by pinentry interactions
params = {} # prompt, desc, error, keyinfo
options = {} # ttyname, etc.
# reset environment
os.environ.pop('LANGUAGE', None)
for env in ['ALL', 'NUMERIC ', 'TIME ', 'COLLATE ', 'MONETARY ', 'MESSAGES ', 'PAPER ', 'NAME ', 'ADDRESS ', 'TELEPHONE ', 'MEASUREMENT ', 'IDENTIFICATION']:
os.environ.pop(f'LC_{env}', None)
os.environ['LANG']='C'
os.environ['LC_CTYPE']='en_US.UTF-8'
# some utilities
logfiles = {
'error': os.path.join(os.environ.get('GPGHOME', os.path.expanduser('~/.gnupg')), 'pinentry-keyring.log'),
'debug': os.devnull
}
def log(dest, *args, **kwargs):
print(*args, **kwargs, file = logfiles[dest], flush=True)
class Coprocess:
''' Handles the backend pinentry process
'''
def __init__(self, args):
os.environ['PINENTRY_KEYRING'] = 'set'
os.environ.pop('PINENTRY_BINARY', None)
try:
self.process = subprocess.Popen([pinentry_bin, *args], stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0, universal_newlines=True)
message, *greeting = self.get_line()
except:
message = None
finally:
if message != 'OK' and self.process:
self.process.kill()
self.process = None
def __del__(self):
if self.process:
self.process.kill()
self.process = None
def get_line(self):
pair = self.process.stdout.readline().rstrip('\n').split(None, 1)
log('debug', 'pinentry > ' + ' '.join(pair))
return pair if len(pair) == 2 else (pair[0], '')
def send(self, data):
if not self.process:
return ['ERR 14 no coproc']
log('debug', 'pinentry < ' + data)
print(data, file=self.process.stdin, flush=True)
word, rem = self.get_line()
reply = [(word, rem)]
while word not in {'ERR', 'OK'}:
word, rem = self.get_line()
reply.append((word, rem))
return reply
def get_pw(self, req):
reply = self.send('CONFIRM' if req == 'bool' else 'GETPIN')
pw = None
if reply[0][0] == 'D' and reply[1][0] == 'OK':
pw = reply[0][1]
return (pw, [' '.join(l) for l in reply])
class NoShowInput(object):
''' Context Manager that sets a TTY’s attributes (lfalgs) to not echo, and restores afterwards
'''
def __init__(self, fd):
self.fd = fd
self.attr = termios.tcgetattr(self.fd)
def __enter__(self):
attr = self.attr[:]
attr[3] = attr[3] & ~termios.ECHO
termios.tcsetattr(self.fd, termios.TCSADRAIN, attr)
return self
def __exit__(self, type, value, traceback):
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.attr)
class GPGTTY(object):
''' Context Manager that allows reading and writing to the TTY specified by GPG
'''
def __enter__():
self.ttyout = open(options['ttyname'], 'w')
self.ttyin = open(options['ttyname'], 'r')
return self
def __exit__(self, type, value, traceback):
self.ttyout.close()
self.ttyin.close()
def write(*args, **kwargs):
print(*args, **kwargs, file=self.ttyout, flush=True)
def read(*args, **kwargs):
read = self.ttyin.readline()
if not read: raise EOFError
assert read.endswith('\n')
return read[:-1]
def get_pw(self, req):
if 'error' in params:
self.write('\033[91m' + params['error'] + '\033[0m')
if 'desc' in params:
self.write(params['desc'].replace('%22', '"').replace('%0A', '\n'))
self.write(params.get('prompt', options.get('default-prompt', 'PIN:')), end=' ')
try:
with NoShowInput(self.ttyin.fileno()):
passphrase = self.read()
except EOFError:
return (False, ['ERR 83886179 Operation cancelled <Pinentry>'])
else:
return (passphrase, [f'D {passphrase}', 'OK'])
finally:
self.write()
class Keyring:
''' Wraps interactions with the keyring module to get and store the password
'''
@staticmethod
def get_pw(req):
try:
pw = keyring.get_password('pinentry-keyring', params['keyinfo'])
except keyring.errors.KeyringError:
log('error', 'ERROR Unhandled exception: ' + repr(e))
traceback.print_exc(file=logfiles['debug'])
with GPGTTY() as tty:
tty.write(f'{os.path.basename(sys.argv[0])}: error looking up password in keyring: {e}', end='\n\n')
return (None, [f'ERR 128 keyring error {e}'])
else:
if pw is None:
return (None, ['ERR 128 password not in keyring'])
elif req == 'pass':
return (pw, [f'D {pw}', 'OK'])
else:
return (True, ['OK'])
@staticmethod
def save_pw(pw):
saving = False
with GPGTTY() as tty:
tty.write(f'{os.path.basename(sys.argv[0])}: {options.get("default-pwmngr", "Save pin to keyring").replace("_", "")}? [y/N] ', end='')
reply = tty.read()
saving = reply.strip() and reply.strip()[0] in {'y', 'Y'}
tty.write(f'{os.path.basename(sys.argv[0])}: {"saving..." if saving else "OK, skipping"}', end = ' ' if saving else '\n')
if not saving:
return
try:
keyring.set_password('pinentry-keyring', params['keyinfo'], pw)
except keyring.errors.KeyringError as e:
log('error', 'ERROR Unhandled exception: ' + repr(e))
traceback.print_exc(file=logfiles['debug'])
with GPGTTY() as tty:
tty.write(f'error: {e}', file=tty)
else:
with GPGTTY() as tty:
tty.write('done.', file=tty)
def getpass(proc):
''' The crux of it all: try our options to get a password or phrase.
If it’s not in the keyring, save it to the keyring.
'''
if options.get('allow-external-password-cache', False):
pw, reply = Keyring.get_pw('pass')
else:
log('debug', 'skipping keyring: external cache disallowed')
with GPGTTY() as tty:
tty.write(f'{os.path.basename(sys.argv[0])}: skipping keyring: external cache disallowed')
pw, reply = (None, ['ERR 1 no external password cache'])
if not pw:
if not proc or fulltty:
with GPGTTY() as tty:
pw, reply = tty.get_pw('pass')
else:
pw, reply = proc.get_pw('pass', proc)
if pw and options.get('allow-external-password-cache', False):
Keyring.save_pw(pw)
del pw # no hanging refs − best security python can afford?
return reply
def handle_command(action, arg, proc):
''' Handle the commands for the subset of the Assuan that interests us
'''
if action.startswith('SET'):
params[action[3:].lower()] = arg
elif action == 'GETPIN':
return getpass(proc)
elif action == 'CONFIRM':
pw, reply = Keyring.get_pw('bool')
#if not pw and (not proc or fulltty):
#with GPGTTY() as tty:
# pw, reply = tty.get_pw('bool')
if not pw:
pw, reply = proc.get_pw('bool', proc)
del pw # no hanging refs − best security python can afford?
return reply
elif action == 'BYE':
proc.send(f'{action} {arg}')
return ['OK Closing connection']
elif action == 'OPTION':
try:
opt, val = arg.split('=', 1)
except ValueError:
opt, val = arg, True
if opt == 'ttyname': os.environ['GPG_TTY'] = val
if opt == 'ttytype': os.environ['GPG_TERM'] = val
if opt == 'lc-ctype': os.environ['LC_CTYPE'] = val
if opt == 'lc-messages': os.environ['LC_MESSAGES'] = val
options[opt] = val
elif action == 'GETINFO' and arg == 'pid':
return [f'D {os.getpid()}', 'OK']
elif action == 'GETINFO' and arg == 'ttyinfo':
return [f'D {os.environ["GPG_TTY"]} {os.environ.get("GPG_TERM", "")} {os.environ["DISPLAY"]}', 'OK']
elif action == 'GETINFO' and arg == 'flavor':
return [f'D keyring', 'OK']
elif action == 'GETINFO' and arg == 'version':
if proc:
return [' '.join(l) for l in proc.send(f'{action} {arg}')]
else:
return ['D 1.1.0', 'OK'] # the version we emulate (and thus implement?)
else:
log('error', f'warning: unknown line {action} {arg}')
return ['ERR 1 unrecognised command']
# In general, just pass it on and forget about the reply
proc.send(f'{action} {arg}')
return ['OK']
def handle_args(args):
''' Parse the command line arguments and remove those not destined for pinentry
'''
iter_args = iter(args[:])
for arg in iter_args:
if arg == '--display':
os.environ['DISPLAY'] = next(iter_args)
elif arg == '--ttyname':
os.environ['GPG_TTY'] = next(iter_args)
elif arg == '--ttytype':
os.environ['GPG_TERM'] = next(iter_args)
elif arg == '--lc-type':
os.environ['LC_CTYPE'] = next(iter_args)
elif arg == '--lc-messages':
os.environ['LC_MESSAGES'] = next(iter_args)
elif arg in {'-t', '--tty', '-n', '--no-tty'}:
fulltty = 'n' not in arg
args.remove(arg)
elif arg in {'-p', '--pinentry'}:
pinentry_bin = next(iter_args)
args.remove(arg)
args.remove(pinentry_bin)
elif arg in {'-v', '--verbose'}:
logfiles['debug'] = logfiles['error']
args.remove(arg)
elif arg in {'-h', '--help'}:
print(__doc__)
print(f'Usage: {os.path.basename(sys.argv[0])} [options]')
print('-p --pinentry BINARY Use BINARY as the backend pinentry program')
print('-t --tty -n --no-tty Prefer the TTY to prompt for the password')
print(f'-v --verbose Wrote more information to {logfiles["error"]}')
print('Further options are passed directly to pinentry')
exit(0)
elif arg in {'-d', '-e', '-g', '--debug', '--enhanced', '--no-global-grab'}:
pass
elif arg in {'-W', '--parent-wid'}:
next(iter_args)
else:
print(f'Warning: unknown argument {arg}')
return args
def main(args):
''' Main loop: while we receive commands on stdin reply to them as best we can.
'''
# open log files (after argument handling)
with open(logfiles['error'], 'w') as logfiles['error'], open(logfiles['debug'], 'w') as logfiles['debug']:
# Any reasons why we should not start?
if os.environ.get('PINENTRY_KEYRING', None):
log('error', 'recursive call')
print('ERR 7 trying to call me recursively', flush=True)
exit(7)
if not os.environ.get('DISPLAY', None):
log('error', f'since DISPLAY is not set, replacing with: {pinentry_bin}')
exit(subprocess.run([pinentry_bin, args], stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr).returncode)
# Go ahead: start the pinentry process and handle the commands
proc = Coprocess(args)
print('OK ready', flush=True)
for line in (l.rstrip('\n') for l in sys.stdin):
if not line or line.startswith('#'):
continue
pair = line.split(None, 1)
action, arg = (pair if len(pair) == 2 else (pair[0], ''))
try:
reply = '\n'.join(handle_command(action, arg, proc))
print(reply, flush=True)
except BrokenPipeError:
break
except Exception as e:
log('error', 'ERROR Unhandled exception: ' + repr(e))
traceback.print_exc(file=logfiles['debug'])
try:
print('ERR 8 unhandled exception', flush=True)
except BrokenPipeError:
break
finally:
if action == 'BYE':
break
if __name__ == '__main__'or True:
main(handle_args(sys.argv[1:]))
@LeoniePhiline
Copy link

LeoniePhiline commented Apr 3, 2024

git commit GPG output:

R git commit --amend                
error: gpg failed to sign the data:
[GNUPG:] KEY_CONSIDERED [REDACTED] 2
[GNUPG:] BEGIN_SIGNING H10
[GNUPG:] PINENTRY_LAUNCHED 14836 keyring 1.2.1 /dev/pts/2 xterm-256color :0
gpg: signing failed: Bad signature
[GNUPG:] FAILURE sign [REDACTED]
gpg: signing failed: Bad signature

fatal: failed to write commit object

pinentry-keyring.log:

ERROR Unhandled exception: TypeError('GPGTTY.__enter__() takes 0 positional arguments but 1 was given')
class GPGTTY(object):
	''' Context Manager that allows reading and writing to the TTY specified by GPG
	'''
-	def __enter__():
+	def __enter__(self):

UPDATE: There's more issues.

Here a working version:

#!/usr/bin/python3
#
# Copyright © 2020	Cimbali <me@cimba.li>
# Iniital concept from mirabilos <m@mirbsd.org> at https://www.mirbsd.org/kwalletcli.htm
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
'''This script wraps a pinentry executable to ask your keyring about the password before prompting you.

All further interactions are forwarded to the pinentry process running in the back.
'''

import os
import sys
import termios
import traceback
import subprocess

import keyring


# configured by command line
pinentry_bin = os.environ.get('PINENTRY', 'pinentry') # which backend pinentry to use
fulltty = True # whether to prompt unknown passwords via TTY rather than backend pinentry

# configured by pinentry interactions
params = {} # prompt, desc, error, keyinfo
options = {} # ttyname, etc.

# reset environment
os.environ.pop('LANGUAGE', None)
for env in ['ALL', 'NUMERIC ', 'TIME ', 'COLLATE ', 'MONETARY ', 'MESSAGES ', 'PAPER ', 'NAME ', 'ADDRESS ', 'TELEPHONE ', 'MEASUREMENT ', 'IDENTIFICATION']:
	os.environ.pop(f'LC_{env}', None)
os.environ['LANG']='C'
os.environ['LC_CTYPE']='en_US.UTF-8'


# some utilities
logfiles = {
	'error': os.path.join(os.environ.get('GPGHOME', os.path.expanduser('~/.gnupg')), 'pinentry-keyring.log'),
	'debug': os.devnull
}

def log(dest, *args, **kwargs):
	print(*args, **kwargs, file = logfiles[dest], flush=True)



class Coprocess:
	''' Handles the backend pinentry process
	'''
	def __init__(self, args):
		os.environ['PINENTRY_KEYRING'] = 'set'
		os.environ.pop('PINENTRY_BINARY', None)

		try:
			self.process = subprocess.Popen([pinentry_bin, *args], stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0, universal_newlines=True)
			message, *greeting = self.get_line()
		except:
			message = None
		finally:
			if message != 'OK' and self.process:
				self.process.kill()
				self.process = None

	def __del__(self):
		if self.process:
			self.process.kill()
			self.process = None

	def get_line(self):
		pair = self.process.stdout.readline().rstrip('\n').split(None, 1)
		log('debug', 'pinentry > ' + ' '.join(pair))
		return pair if len(pair) == 2 else (pair[0], '')

	def send(self, data):
		if not self.process:
			return ['ERR 14 no coproc']

		log('debug', 'pinentry < ' + data)
		print(data, file=self.process.stdin, flush=True)

		word, rem = self.get_line()
		reply = [(word, rem)]
		while word not in {'ERR', 'OK'}:
			word, rem = self.get_line()
			reply.append((word, rem))

		return reply

	def get_pw(self, req):
		reply = self.send('CONFIRM' if req == 'bool' else 'GETPIN')
		pw = None

		if reply[0][0] == 'D' and reply[1][0] == 'OK':
			pw = reply[0][1]

		return (pw, [' '.join(l) for l in reply])




class NoShowInput(object):
	''' Context Manager that sets a TTY’s attributes (lfalgs) to not echo, and restores afterwards
	'''
	def __init__(self, fd):
		self.fd = fd
		self.attr = termios.tcgetattr(self.fd)

	def __enter__(self):
		attr = self.attr[:]
		attr[3] = attr[3] & ~termios.ECHO
		termios.tcsetattr(self.fd, termios.TCSADRAIN, attr)
		return self

	def __exit__(self, type, value, traceback):
		termios.tcsetattr(self.fd, termios.TCSADRAIN, self.attr)



class GPGTTY(object):
	''' Context Manager that allows reading and writing to the TTY specified by GPG
	'''
	def __enter__(self):
		self.ttyout = open(options['ttyname'], 'w')
		self.ttyin = open(options['ttyname'], 'r')
		return self

	def __exit__(self, type, value, traceback):
		self.ttyout.close()
		self.ttyin.close()

	def write(self, *args, **kwargs):
		print(*args, **kwargs, file=self.ttyout, flush=True)

	def read(self, *args, **kwargs):
		read = self.ttyin.readline()
		if not read: raise EOFError
		assert read.endswith('\n')
		return read[:-1]

	def get_pw(self, req):
		if 'error' in params:
			self.write('\033[91m' + params['error'] + '\033[0m')

		if 'desc' in params:
			self.write(params['desc'].replace('%22', '"').replace('%0A', '\n'))

		self.write(params.get('prompt', options.get('default-prompt', 'PIN:')), end=' ')

		try:
			with NoShowInput(self.ttyin.fileno()):
				passphrase = self.read()

		except EOFError:
			return (False, ['ERR 83886179 Operation cancelled <Pinentry>'])

		else:
			return (passphrase, [f'D {passphrase}', 'OK'])

		finally:
			self.write()



class Keyring:
	''' Wraps interactions with the keyring module to get and store the password
	'''
	@staticmethod
	def get_pw(req):
		try:
			pw = keyring.get_password('pinentry-keyring',  params['keyinfo'])

		except keyring.errors.KeyringError:
			log('error', 'ERROR Unhandled exception: ' + repr(e))
			traceback.print_exc(file=logfiles['debug'])

			with GPGTTY() as tty:
				tty.write(f'{os.path.basename(sys.argv[0])}: error looking up password in keyring: {e}', end='\n\n')

			return (None, [f'ERR 128 keyring error {e}'])

		else:
			if pw is None:
				return (None, ['ERR 128 password not in keyring'])

			elif req == 'pass':
				return (pw, [f'D {pw}', 'OK'])

			else:
				return (True, ['OK'])


	@staticmethod
	def save_pw(pw):
		saving = False

		with GPGTTY() as tty:
			tty.write(f'{os.path.basename(sys.argv[0])}: {options.get("default-pwmngr", "Save pin to keyring").replace("_", "")}? [y/N] ', end='')

			reply = tty.read()
			saving = reply.strip() and reply.strip()[0] in {'y', 'Y'}

			tty.write(f'{os.path.basename(sys.argv[0])}: {"saving..." if saving else "OK, skipping"}', end = ' ' if saving else '\n')

		if not saving:
			return

		try:
			keyring.set_password('pinentry-keyring',  params['keyinfo'], pw)

		except keyring.errors.KeyringError as e:
			log('error', 'ERROR Unhandled exception: ' + repr(e))
			traceback.print_exc(file=logfiles['debug'])

			with GPGTTY() as tty:
				tty.write(f'error: {e}')

		else:
			with GPGTTY() as tty:
				tty.write('done.')



def getpass(proc):
	''' The crux of it all: try our options to get a password or phrase.

	If it’s not in the keyring, save it to the keyring.
	'''
	if options.get('allow-external-password-cache', False):
		pw, reply = Keyring.get_pw('pass')

	else:
		log('debug', 'skipping keyring: external cache disallowed')
		with GPGTTY() as tty:
			tty.write(f'{os.path.basename(sys.argv[0])}: skipping keyring: external cache disallowed')

		pw, reply = (None, ['ERR 1 no external password cache'])

	if not pw:
		if not proc or fulltty:
			with GPGTTY() as tty:
				pw, reply = tty.get_pw('pass')
		else:
			pw, reply = proc.get_pw('pass', proc)

		if pw and options.get('allow-external-password-cache', False):
			Keyring.save_pw(pw)

	del pw # no hanging refs − best security python can afford?
	return reply


def handle_command(action, arg, proc):
	''' Handle the commands for the subset of the Assuan that interests us
	'''
	if action.startswith('SET'):
		params[action[3:].lower()] = arg

	elif action == 'GETPIN':
		return getpass(proc)

	elif action == 'CONFIRM':
		pw, reply = Keyring.get_pw('bool')
		#if not pw and (not proc or fulltty):
		#with GPGTTY() as tty:
		#	pw, reply = tty.get_pw('bool')
		if not pw:
			pw, reply = proc.get_pw('bool', proc)

		del pw # no hanging refs − best security python can afford?
		return reply

	elif action == 'BYE':
		proc.send(f'{action} {arg}')
		return ['OK Closing connection']

	elif action == 'OPTION':
		try:
			opt, val = arg.split('=', 1)
		except ValueError:
			opt, val = arg, True
		if opt == 'ttyname': os.environ['GPG_TTY'] = val
		if opt == 'ttytype': os.environ['GPG_TERM'] = val
		if opt == 'lc-ctype': os.environ['LC_CTYPE'] = val
		if opt == 'lc-messages': os.environ['LC_MESSAGES'] = val
		options[opt] = val

	elif action == 'GETINFO' and arg == 'pid':
		return [f'D {os.getpid()}', 'OK']

	elif action == 'GETINFO' and arg == 'ttyinfo':
		return [f'D {os.environ["GPG_TTY"]} {os.environ.get("GPG_TERM", "")} {os.environ["DISPLAY"]}', 'OK']

	elif action == 'GETINFO' and arg == 'flavor':
		return [f'D keyring', 'OK']

	elif action == 'GETINFO' and arg == 'version':
		if proc:
			return [' '.join(l) for l in proc.send(f'{action} {arg}')]
		else:
			return ['D 1.1.0', 'OK'] # the version we emulate (and thus implement?)

	else:
		log('error', f'warning: unknown line {action} {arg}')
		return ['ERR 1 unrecognised command']

	# In general, just pass it on and forget about the reply
	proc.send(f'{action} {arg}')
	return ['OK']


def handle_args(args):
	''' Parse the command line arguments and remove those not destined for pinentry
	'''
	iter_args = iter(args[:])

	for arg in iter_args:
		if arg == '--display':
			os.environ['DISPLAY'] = next(iter_args)

		elif arg == '--ttyname':
			os.environ['GPG_TTY'] = next(iter_args)

		elif arg == '--ttytype':
			os.environ['GPG_TERM'] = next(iter_args)

		elif arg == '--lc-type':
			os.environ['LC_CTYPE'] = next(iter_args)

		elif arg == '--lc-messages':
			os.environ['LC_MESSAGES'] = next(iter_args)

		elif arg in {'-t', '--tty', '-n', '--no-tty'}:
			fulltty = 'n' not in arg
			args.remove(arg)

		elif arg in {'-p', '--pinentry'}:
			pinentry_bin = next(iter_args)
			args.remove(arg)
			args.remove(pinentry_bin)

		elif arg in {'-v', '--verbose'}:
			logfiles['debug'] = logfiles['error']
			args.remove(arg)

		elif arg in {'-h', '--help'}:
			print(__doc__)
			print(f'Usage: {os.path.basename(sys.argv[0])} [options]')
			print('-p --pinentry BINARY    Use BINARY as the backend pinentry program')
			print('-t --tty -n --no-tty    Prefer the TTY to prompt for the password')
			print(f'-v --verbose            Wrote more information to {logfiles["error"]}')
			print('Further options are passed directly to pinentry')
			exit(0)

		elif arg in {'-d', '-e', '-g', '--debug', '--enhanced', '--no-global-grab'}:
			pass

		elif arg in {'-W', '--parent-wid'}:
			next(iter_args)

		else:
			print(f'Warning: unknown argument {arg}')

	return args



def main(args):
	''' Main loop: while we receive commands on stdin reply to them as best we can.
	'''

	# open log files (after argument handling)
	with open(logfiles['error'], 'w') as logfiles['error'], open(logfiles['debug'], 'w') as logfiles['debug']:

		# Any reasons why we should not start?

		if os.environ.get('PINENTRY_KEYRING', None):
			log('error', 'recursive call')
			print('ERR 7 trying to call me recursively', flush=True)
			exit(7)

		if not os.environ.get('DISPLAY', None):
			log('error', f'since DISPLAY is not set, replacing with: {pinentry_bin}')
			exit(subprocess.run([pinentry_bin, args], stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr).returncode)


		# Go ahead: start the pinentry process and handle the commands

		proc = Coprocess(args)
		print('OK ready', flush=True)

		for line in (l.rstrip('\n') for l in sys.stdin):
			if not line or line.startswith('#'):
				continue

			pair = line.split(None, 1)
			action, arg = (pair if len(pair) == 2 else (pair[0], ''))

			try:
				reply = '\n'.join(handle_command(action, arg, proc))
				print(reply, flush=True)

			except BrokenPipeError:
				break

			except Exception as e:
				log('error', 'ERROR Unhandled exception: ' + repr(e))
				traceback.print_exc(file=logfiles['debug'])

				try:
					print('ERR 8 unhandled exception', flush=True)
				except BrokenPipeError:
					break

			finally:
				if action == 'BYE':
					break


if __name__ == '__main__'or True:
	main(handle_args(sys.argv[1:]))

And the changes from the above original:

134c134
<       def __enter__():
---
>       def __enter__(self):
143c143
<       def write(*args, **kwargs):
---
>       def write(self, *args, **kwargs):
146c146
<       def read(*args, **kwargs):
---
>       def read(self, *args, **kwargs):
227c227
<                               tty.write(f'error: {e}', file=tty)
---
>                               tty.write(f'error: {e}')
231c231
<                               tty.write('done.', file=tty)
---
>                               tty.write('done.')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment