Skip to content

Instantly share code, notes, and snippets.

@aeris
Created September 29, 2017 22:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save aeris/7e226a2d26bd02d07bfab9762265d760 to your computer and use it in GitHub Desktop.
Save aeris/7e226a2d26bd02d07bfab9762265d760 to your computer and use it in GitHub Desktop.
pinentry proxy between kwallet and pinentry-qt
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# AGPLv3 license
import dbus, logging, os, re, subprocess, sys, threading
LOGGER = logging.getLogger("kwallet")
hdlr = logging.FileHandler("/tmp/pinentry-kwallet.log")
formatter = logging.Formatter("%(levelname)s %(message)s")
hdlr.setFormatter(formatter)
#LOGGER.addHandler(hdlr)
LOGGER.setLevel(logging.DEBUG)
class KWallet:
__APP_ID = "pinentry-kwallet"
__APP_FOLDER = __APP_ID
def __init__(self):
LOGGER.info("Open kwallet")
session = dbus.SessionBus()
kwallet = session.get_object("org.kde.kwalletd5", "/modules/kwalletd5")
self.__kwallet = dbus.Interface(kwallet, dbus_interface="org.kde.KWallet")
self.__wallet = kwallet.localWallet()
self.__handle = kwallet.open(self.__wallet, dbus.Int64(0), self.__APP_ID)
self.__init()
def __init(self):
if not self.__kwallet.hasFolder(self.__handle, self.__APP_FOLDER, self.__APP_ID):
LOGGER.info("Create folder %s", self.__APP_FOLDER)
self.__kwallet.createFolder(self.__handle, self.__APP_FOLDER, self.__APP_ID)
def read(self, name):
LOGGER.info("Read password %s", name)
if not self.__kwallet.hasEntry(self.__handle, self.__APP_FOLDER, name, self.__APP_ID):
return None
password = self.__kwallet.readPassword(self.__handle, self.__APP_FOLDER, name, self.__APP_ID)
return password
def write(self, name, password):
LOGGER.info("Write password %s", name)
self.__kwallet.writePassword(self.__handle, self.__APP_FOLDER, name, password, self.__APP_ID)
def __enter__(self):
return self
def __exit__(self, *_):
LOGGER.info("Close kwallet")
self.__kwallet.disconnectApplication(self.__wallet, self.__APP_ID)
class Response:
def __init__(self):
self.__cv = threading.Condition()
self.__response = None
def set(self, value):
self.__cv.acquire()
self.__response = value
self.__cv.notify()
self.__cv.release()
def get(self):
self.__cv.acquire()
self.__cv.wait()
self.__cv.release()
class PinentryProxy(threading.Thread):
__PINENTRY_CLIENT = "pinentry-qt"
__REGEX_ID = "(?:0x)?(?P<id>[0-9A-F]+)"
__REGEX_KEYID = [
re.compile("\(main key ID %s\)" % (__REGEX_ID)),
re.compile("ID %s," % (__REGEX_ID)),
re.compile("\(identifiant de clef principale %s\)" % (__REGEX_ID)),
re.compile("identifiant %s," % (__REGEX_ID))
]
__REGEX_PASSWORD = re.compile("^D (?P<password>.*)$")
def __init__(self, wallet):
super().__init__()
self.__intercept = None
self.__wallet = wallet
self.__id = None
cmd = [ self.__PINENTRY_CLIENT ]
cmd.extend(sys.argv[1:])
self.__process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
def __enter__(self):
return self
def __exit__(self, *_):
LOGGER.info("Stop pinentry")
self.__process.kill()
self.__process.wait()
LOGGER.info("Stop proxy")
self.join()
def run(self):
while True:
LOGGER.info("Waiting output...")
line = self.__process.stdout.readline()
if line == b"":
return
line = line.decode()
LOGGER.debug("<= %s", line.strip())
if self.__intercept:
self.__intercept.set(line)
else:
self.__send([line])
def __write(self, line):
LOGGER.debug("=> %s", line.strip())
self.__process.stdin.write(line.encode())
self.__process.stdin.flush()
def __extract_key_id(self, desc):
for regex in self.__REGEX_KEYID:
match = regex.search(desc)
if match:
return match.group("id")
return None
def __command(self, line):
if line.startswith("SETDESC"):
self.__id = self.__extract_key_id(line)
LOGGER.info("Key ID detected: %s", self.__id)
if line.startswith("GETPIN"):
if self.__id:
password = self.__wallet.read(self.__id)
if password:
LOGGER.info("Password found, using wallet")
self.__send(["D %s\n" % password, "OK\n"])
else:
LOGGER.info("No password found, using pinentry")
self.__intercept = Response()
self.__write(line)
response = self.__intercept.get()
match = self.__REGEX_PASSWORD.match(result)
if match:
LOGGER.info("Password found, save in wallet")
password = match.group("password")
self.__wallet.write(self.__id, password)
return
self.__write(line)
def __receive(self):
LOGGER.debug("Waiting command...")
cmd = sys.stdin.readline()
LOGGER.debug("-> %s", cmd.strip())
return cmd
def __send(self, lines):
for line in lines:
LOGGER.debug("<- %s", line.strip())
sys.stdout.write(line)
sys.stdout.flush()
def process(self):
while True:
cmd = self.__receive()
if not cmd:
return
self.__command(cmd)
try:
with KWallet() as kwallet:
with PinentryProxy(kwallet) as pinentry:
pinentry.start()
pinentry.process()
except (BrokenPipeError, KeyboardInterrupt):
pass
except Exception as e:
LOGGER.exception("Exception occurs...")
@wilbowma
Copy link

FYI, I had to patch this to get it to work. After the patch, it seems to work better than kwalletcli.

--- a    2020-08-28 12:48:53.658490212 -0700
+++ b    2020-08-28 12:49:45.801980631 -0700
@@ -60,6 +60,7 @@
                self.__cv.acquire()
                self.__cv.wait()
                self.__cv.release()
+               return self.__response
 
 class PinentryProxy(threading.Thread):
        __PINENTRY_CLIENT = "pinentry-qt"
@@ -136,7 +137,7 @@
                                        self.__write(line)
                                        response = self.__intercept.get()
 
-                                       match = self.__REGEX_PASSWORD.match(result)
+                                       match = self.__REGEX_PASSWORD.match(response)
                                        if match:
                                                LOGGER.info("Password found, save in wallet")
                                                password = match.group("password")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment