Skip to content

Instantly share code, notes, and snippets.

@Terrance
Last active March 1, 2024 18:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Terrance/4d9df2d550806fb038d18956fbe3b268 to your computer and use it in GitHub Desktop.
Save Terrance/4d9df2d550806fb038d18956fbe3b268 to your computer and use it in GitHub Desktop.
Methods to decrypt and re-encrypt database backups for Conversations, an Android XMPP client.
from collections import defaultdict
from getpass import getpass
import gzip
import hashlib
import io
import json
import logging
import pathlib
import struct
import sqlite3
from typing import Any, BinaryIO, Optional
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
LOG = logging.getLogger(__name__)
SCHEMA = """
CREATE TABLE accounts (
uuid TEXT PRIMARY KEY,
username TEXT,
server TEXT,
password TEXT,
display_name TEXT,
status TEXT,
status_message TEXT,
rosterversion TEXT,
options NUMBER,
avatar TEXT,
keys TEXT,
hostname TEXT,
resource TEXT,
port NUMBER,
pinned_mechanism TEXT,
pinned_channel_binding TEXT,
fast_mechanism TEXT,
fast_token TEXT
);
CREATE TABLE conversations (
uuid TEXT PRIMARY KEY,
name TEXT,
contactUuid TEXT,
accountUuid TEXT,
contactJid TEXT,
created NUMBER,
status NUMBER,
mode NUMBER,
attributes TEXT,
FOREIGN KEY (accountUuid) REFERENCES accounts (uuid) ON DELETE CASCADE
);
CREATE TABLE messages (
uuid TEXT PRIMARY KEY,
conversationUuid TEXT,
timeSent NUMBER,
counterpart TEXT,
trueCounterpart TEXT,
body TEXT,
encryption NUMBER,
status NUMBER,type NUMBER,
relativeFilePath TEXT,
serverMsgId TEXT,
axolotl_fingerprint TEXT,
carbon INTEGER,
edited TEXT,
read NUMBER,
oob INTEGER,
errorMsg TEXT,
readByMarkers TEXT,
markable NUMBER,
deleted NUMBER,
bodyLanguage TEXT,
remoteMsgId TEXT,
FOREIGN KEY (conversationUuid) REFERENCES conversations (uuid) ON DELETE CASCADE
);
CREATE TABLE sessions (
account TEXT,
name TEXT,
device_id INTEGER,
key TEXT,
FOREIGN KEY (account) REFERENCES accounts (uuid) ON DELETE CASCADE,
UNIQUE (account, name, device_id) ON CONFLICT REPLACE
);
CREATE TABLE prekeys (
account TEXT,
id INTEGER,
key TEXT,
FOREIGN KEY (account) REFERENCES accounts (uuid) ON DELETE CASCADE,
UNIQUE (account, id) ON CONFLICT REPLACE
);
CREATE TABLE signed_prekeys (
account TEXT,
id INTEGER,
key TEXT,
FOREIGN KEY (account) REFERENCES accounts (uuid) ON DELETE CASCADE,
UNIQUE (account, id) ON CONFLICT REPLACE
);
CREATE TABLE identities (
account TEXT,
name TEXT,
ownkey INTEGER,
fingerprint TEXT,
certificate BLOB,
trust TEXT,
active NUMBER,
last_activation NUMBER,
key TEXT,
FOREIGN KEY (account) REFERENCES accounts(uuid) ON DELETE CASCADE,
UNIQUE (account, name, fingerprint) ON CONFLICT IGNORE
);
"""
class StreamReader:
def __init__(self, stream: BinaryIO):
self.stream = stream
def _read(self, fmt: str):
return struct.unpack(fmt, self.stream.read(struct.calcsize(fmt)))[0]
def short(self) -> int:
return self._read(">h")
def long(self) -> int:
return self._read(">q")
def int(self) -> int:
return self._read(">i")
def utf(self) -> bytes:
len = self.short()
return self.stream.read(len)
class StreamWriter:
def __init__(self, stream: io.BufferedWriter):
self.stream = stream
def _write(self, fmt: str, value: Any) -> None:
self.stream.write(struct.pack(fmt, value))
def short(self, value: int) -> None:
self._write(">h", value)
def long(self, value: int) -> None:
self._write(">q", value)
def int(self, value: int) -> None:
self._write(">i", value)
def utf(self, value: bytes) -> None:
self.short(len(value))
self.stream.write(value)
def params(stream: BinaryIO):
reader = StreamReader(stream)
ver = reader.int()
assert ver in (1, 2), "Unexpected backup version {}".format(ver)
app = reader.utf()
jid = reader.utf()
ts = reader.long()
LOG.debug("Backup header: app=%r, jid=%r, ts=%r", app, jid, ts)
nonce = stream.read(12)
salt = stream.read(16)
return (ver, app, jid, ts, nonce, salt)
def cipher(password: str, salt: bytes):
key = hashlib.pbkdf2_hmac("sha1", password.encode(), salt, 1024, 128 // 8)
return AESGCM(key)
def decrypt(backup_file: pathlib.Path, database_file: pathlib.Path, password: Optional[str] = None):
with open(backup_file, "rb") as stream:
ver, _, _, _, nonce, salt = params(stream)
if not password:
password = getpass("Enter decryption password: ")
gcm = cipher(password, salt)
try:
zipped = gcm.decrypt(nonce, stream.read(), None)
except InvalidTag:
raise RuntimeError("Failed to decrypt backup with the given password")
data = gzip.decompress(zipped).decode()
LOG.debug("Decrypted %r bytes", len(data))
conn = sqlite3.connect(str(database_file))
conn.executescript(SCHEMA)
if ver == 1:
conn.executescript(data)
LOG.debug("Applied SQL to database file")
elif ver == 2:
tables = defaultdict(list)
for row in json.loads(data):
tables[row["table"]].append(row["values"])
for table, rows in tables.items():
for row in rows:
names = ", ".join(row)
placeholders = ":" + ", :".join(row)
conn.execute("INSERT INTO {} ({}) VALUES ({})".format(table, names, placeholders), row)
LOG.debug("Inserted %d row(s) to table %r", len(rows), table)
conn.commit()
conn.close()
def encrypt(database_file: pathlib.Path, old_backup_file: pathlib.Path, backup_file: pathlib.Path, password: Optional[str] = None):
with open(old_backup_file, "rb") as stream:
_, app, jid, ts, nonce, salt = params(stream)
db = sqlite3.connect(str(database_file))
sql = "\n".join(query for query in db.iterdump() if not query.startswith("CREATE TABLE "))
zipped = gzip.compress(sql.encode())
with open(backup_file, "wb") as stream:
writer = StreamWriter(stream)
writer.int(1)
writer.utf(app)
writer.utf(jid)
writer.long(ts)
stream.write(nonce)
stream.write(salt)
if not password:
password = getpass("Enter encryption password: ")
gcm = cipher(password, salt)
data = gcm.encrypt(nonce, zipped, None)
stream.write(data)
LOG.debug("Encrypted %r bytes", len(data))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment