Skip to content

Instantly share code, notes, and snippets.

@neko-neko-nyan
Created March 18, 2023 10:51
Show Gist options
  • Save neko-neko-nyan/0ddf9563f89a6b9952a459c491069ec3 to your computer and use it in GitHub Desktop.
Save neko-neko-nyan/0ddf9563f89a6b9952a459c491069ec3 to your computer and use it in GitHub Desktop.
Simple API client / demo for KeepassXC (simulates browser plugin) in python
import base64
import json
import random
import nacl.public
import win32file
def _check_nonce_length(nonce: str):
return len(base64.decodebytes(nonce.encode())) == nacl.public.Box.NONCE_SIZE
def _get_nonce() -> str:
return base64.b64encode(random.randbytes(24)).decode()
def _incremented_nonce(nonce: str) -> str:
old_nonce = base64.decodebytes(nonce.encode())
new_nonce = bytearray(old_nonce)
i = 0
c = 1
while i < len(new_nonce):
c += new_nonce[i]
new_nonce[i] = c
c >>= 8
i += 1
return base64.b64encode(new_nonce).decode()
def _get_nonces() -> tuple[str, str]:
nonce = _get_nonce()
incremented_nonce = _incremented_nonce(nonce)
return nonce, incremented_nonce
def _is_valid_response(response: dict, nonce: str):
return response.get('success') == 'true' and\
_check_nonce_length(response.get('nonce')) and\
response.get('nonce') == nonce
class KeepassError(Exception):
pass
class KeepassApiError(KeepassError):
def __init__(self, code, message, action, request_id=None):
super().__init__(message)
self.code = code
self.message = message
self.action = action
self.request_id = request_id
class InvalidResponseError(KeepassError):
def __init__(self):
super().__init__("Invalid response")
class KeepassApi:
def __init__(self):
self._client_id = None
self.generate_client_id()
self._handle = win32file.CreateFile(
r'\\.\pipe\org.keepassxc.KeePassXC.BrowserServer_neko',
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0,
None,
win32file.OPEN_EXISTING,
0,
None
)
self._private_key = nacl.public.PrivateKey.generate()
self._server_public_key = None
self._server_version = None
def generate_client_id(self):
self._client_id = base64.b64encode(random.randbytes(24)).decode()
def _send_native_message(self, action: str, req: dict, *, enable_timeout=False, trigger_unlock=False) -> dict:
req['action'] = action
req["clientID"] = self._client_id
if trigger_unlock:
req['triggerUnlock'] = 'true'
req_str = json.dumps(req).encode()
win32file.WriteFile(self._handle, req_str)
code, resp_str = win32file.ReadFile(self._handle, 64*1024)
resp = json.loads(resp_str.decode())
if resp.get('action') != action:
raise InvalidResponseError()
return resp
def _encrypt(self, data: dict, nonce: str) -> str:
message_data = json.dumps(data).encode()
message_nonce = base64.decodebytes(nonce.encode())
if self._server_public_key is not None:
message = nacl.public.Box(self._private_key, self._server_public_key)\
.encrypt(message_data, message_nonce).ciphertext
if message:
return base64.b64encode(message).decode()
return ''
def _decrypt(self, data: str, nonce: str) -> str:
data = base64.decodebytes(data.encode())
nonce = base64.decodebytes(nonce.encode())
res = nacl.public.Box(self._private_key, self._server_public_key).decrypt(data, nonce)
return res.decode()
def send_message(self, action: str, *,
enable_timeout=False, trigger_unlock=False, **data):
data['action'] = action
nonce, incremented_nonce = _get_nonces()
request = {
"nonce": nonce,
"message": self._encrypt(data, nonce)
}
if "requestID" in data:
request['requestID'] = data['requestID']
response = self._send_native_message(action, request, enable_timeout=enable_timeout,
trigger_unlock=trigger_unlock)
return self._process_raw_message(response, incremented_nonce)
def _process_raw_message(self, response: dict, nonce: str):
if "message" not in response or "nonce" not in response:
if "error" in response and "errorCode" in response:
raise KeepassApiError(response['errorCode'], response['error'], response['action'],
response.get('requestID'))
raise InvalidResponseError()
res = self._decrypt(response["message"], response["nonce"])
if not res:
raise InvalidResponseError()
parsed = json.loads(res)
return self._process_encrypted_message(parsed, nonce)
def _process_encrypted_message(self, parsed: dict, nonce: str):
if not _is_valid_response(parsed, nonce):
self._connected = False
raise InvalidResponseError()
del parsed['success']
del parsed['nonce']
version = parsed.get('version')
if version:
del parsed['version']
self._server_version = version
self._connected = True
return parsed
def change_public_keys(self):
self._connected = False
nonce, incremented_nonce = _get_nonces()
key = base64.b64encode(bytes(self._private_key.public_key)).decode()
resp = self._send_native_message("change-public-keys", dict(nonce=nonce, publicKey=key))
server_key = self._process_encrypted_message(resp, incremented_nonce).get('publicKey')
if server_key:
self._server_public_key = nacl.public.PublicKey(base64.decodebytes(resp.get('publicKey').encode()))
class KeepassXCClient:
def __init__(self):
self.api = KeepassApi()
self._db_hash = None
self._connected = False
self._key_ring = {}
def _send_message(self, action: str, *, nonce: str | None = None,
enable_timeout=False, trigger_unlock=False, **data):
return self.api.send_message(action, nonce=nonce, enable_timeout=enable_timeout, trigger_unlock=trigger_unlock,
**data)
def get_database_hash(self, connected_keys: list[str] | None = None):
if connected_keys is None:
hash = self._send_message("get-databasehash").get('hash')
else:
response = self._send_message("get-databasehash", connectedKeys=connected_keys)
old_hash = response.get('oldHash')
hash = response.get('hash')
self._db_hash = hash
if hash == '':
return None
return hash
def associate(self, id_key: str | None = None):
key = base64.b64encode(bytes(self.api._private_key.public_key)).decode()
if id_key is None:
response = self._send_message("associate", key=key)
else:
response = self._send_message("associate", key=key, idKey=id_key)
hash = response.get('hash')
id_key = id_key or key
self._key_ring[self._db_hash] = (self._db_hash, response['id'], id_key)
def test_associate(self, id_key: str | None = None):
key = base64.b64encode(bytes(self.api._private_key.public_key)).decode()
return self._send_message("test-associate", key=key, id=id_key or key)["id", "hash"]
def get_logins(self, url: str, id: str = "", form_url: str = "", http_auth: bool = False):
# id unused, returned in response
return self._send_message("get-logins", url=url, id=id, submitUrl=form_url,
keys=[dict(id=id, key=key) for hash, id, key in self._key_ring.values()],
httpAuth="true" if http_auth else "false")['entries', "id", "hash", "count"]
actions = (
"generate-password", # async, with requestID
"set-login", # count=null, entries=null, error="success" or "error", hash
"lock-database", # <empty>
"get-database-groups", # groups
"create-new-group", # name, uuid
"get-totp", # totp
"delete-entry", # success="true" or "false"
"request-autotype" # <empty>
)
def pipe_client():
client = KeepassXCClient()
client.api.change_public_keys()
print(client.get_database_hash())
print(client.associate())
print(client.get_logins("https://vk.com/", form_url="https://vk.com/"))
if __name__ == '__main__':
pipe_client()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment