Skip to content

Instantly share code, notes, and snippets.

@malicious
Created November 16, 2022 01:55
Show Gist options
  • Save malicious/cd2a17f1ace34d27a8c710892721b28d to your computer and use it in GitHub Desktop.
Save malicious/cd2a17f1ace34d27a8c710892721b28d to your computer and use it in GitHub Desktop.
Re-encrypts an iOS backup that's been decrypted by mvt-ios.
"""
Re-encrypts an iOS backup that's been decrypted by mvt-ios.
Depending on the iOS version of the backup, there may be files that were never
encrypted in the first place, which mvt-ios then skips over during decryption.
"""
import hashlib
import itertools
import json
import mmap
import os
import pathlib
import plistlib
import sqlite3
import struct
import sys
from pprint import pprint
from typing import Dict, List, Optional
import NSKeyedUnArchiver
import keyring
from Crypto.Cipher import AES
def _aes_unwrap(kek, wrapped_key):
def pack_64b(s):
return struct.pack(">Q", s)
def unpack_64b(s):
return struct.unpack(">Q", s)[0]
C = []
for i in range(len(wrapped_key) // 8):
C.append(unpack_64b(wrapped_key[i * 8:i * 8 + 8]))
n = len(C) - 1
R = [0] * (n + 1)
A = C[0]
for i in range(1, n + 1):
R[i] = C[i]
for j in reversed(range(0, 6)):
for i in reversed(range(1, n + 1)):
todec = pack_64b(A ^ (n * j + i))
todec += pack_64b(R[i])
B = AES.new(kek, AES.MODE_ECB).decrypt(todec)
A = unpack_64b(B[:8])
R[i] = unpack_64b(B[8:])
if A != 0xa6a6a6a6a6a6a6a6:
# print(f"AES decryption integrity check failed, key IV: {A}")
return None
return b"".join(map(pack_64b, R[1:]))
# Check the SHA1 of a given file to confirm it's what we expected
def _recompute_file_hash(filepath):
buffer_size = 128 * 1024
sha1 = hashlib.sha1()
with open(filepath, 'rb') as f:
while True:
data = f.read(buffer_size)
if not data:
break
sha1.update(data)
return sha1.digest()
class PasscodeCacher:
"""
Passcodes are stored as a dict that maps from passcode_str to passcode_keys (bytearray).
- "None" is permitted as a dictionary key, for if we don't have the original passcode_str
- A list of passcode_keys is stored as the value, for times when the encryption algorithm changes
- More-recent passcode_key entries are appended to the end of the list. Try not to depend on this.
"""
KEYRING_IDENTIFIER: str = "iOS backup passcodes / re-encrypt.py"
def __init__(self):
self.passcodes: Dict[str, List] = {}
def load_from_keyring(self):
encoded_passcodes = keyring.get_password(PasscodeCacher.KEYRING_IDENTIFIER, None)
if encoded_passcodes:
stored_passcodes = json.loads(encoded_passcodes)
self.passcodes.update(stored_passcodes)
def save_to_keyring(self):
encoded_passcodes = json.dumps(self.passcodes)
keyring.set_password(PasscodeCacher.KEYRING_IDENTIFIER, None, encoded_passcodes)
def has(self, key):
return key in self.passcodes
def add_one(self, key, value):
if key not in self.passcodes:
self.passcodes[key] = []
passcode_keys = self.passcodes[key]
passcode_keys.append(value.hex())
# Remove duplicate passcode_keys
self.passcodes[key] = list(set(passcode_keys))
def get_one(self, item):
return bytes.fromhex(self.passcodes[item][-1])
def values(self) -> List:
return map(bytes.fromhex, itertools.chain.from_iterable(self.passcodes.values()))
class ManifestPlist:
# This is used to store the unwrapped keys in a "fake" keybag entry, because
# the keybag is already a per-protection class kind of data structure.
#
# Not a great practice, but every other piece of iOS code does it.
#
UNWRAPPED_KEY_ENTRY = b"unlocked-WPKY"
def __init__(self, backup_dir):
self.mpl_path = os.path.join(backup_dir, "Manifest.plist")
self.passcode_cacher = PasscodeCacher()
self.passcode_cacher.load_from_keyring()
def expects_encrypted_backup(self):
with open(self.mpl_path, 'rb') as f:
manifest = plistlib.load(f)
expects_encrypted = manifest['IsEncrypted']
print(f"[INFO] Manifest.plist reports IsEncrypted={expects_encrypted}")
return expects_encrypted
@staticmethod
def _loop_tlv_blocks(blob):
i = 0
while i + 8 <= len(blob):
# First 4 bytes are the identifier for this block
tag = blob[i:i + 4]
# Next 4 bytes are the length (most significant byte first)
# (length of 4 means the total block length is 0xc bytes, but this is handled by the caller)
reported_length = struct.unpack(">L", blob[i + 4:i + 8])[0]
data = blob[i + 8:i + 8 + reported_length]
if len(data) == 4:
data = struct.unpack(">L", data)[0]
yield (tag, data,)
# Iterate to the next block
i += 8 + reported_length
def parse_keybag(self):
with open(self.mpl_path, 'rb') as infile:
manifest = plistlib.load(infile)
keybag_attrs = {}
keybag_uuid = None
keybag_wrap = None
parsed_keybag_keys = {}
current_key = None
def _close_key_class(current_key_dict):
if not current_key_dict:
return
key_class = current_key_dict[b"CLAS"]
current_key_dict[b"CLAS"] = key_class
parsed_keybag_keys[key_class] = current_key_dict
# tag/data are in a flat multidict, so parse them into a more-hierarchical structure
#
# - start of the multidict is usually VERS + TYPE, keep those in "keybag_attrs"
# - UUID usually represents the start of a new key
# - TODO: extract key info for use with hashcat
#
# TODO: Info in https://stackoverflow.com/questions/1498342/ is much more detailed.
#
backup_keybag = manifest['BackupKeyBag']
for tag, data in ManifestPlist._loop_tlv_blocks(backup_keybag):
# print(f"{tag}: {data}")
if tag == b"TYPE":
assert not current_key
keybag_attrs[b"TYPE"] = data
elif tag == b"UUID" and keybag_uuid is None:
assert not current_key
keybag_uuid = data.hex()
elif tag == b"WRAP" and keybag_wrap is None:
assert not current_key
keybag_wrap = data
elif tag == b"UUID":
# Use UUID tag to denote the start of a new key
# …which means we wrap up parsing of the current key
_close_key_class(current_key)
current_key = {}
current_key[b"UUID"] = data
current_key[b"UUID"] = data.hex()
elif tag == b"CLAS":
current_key[b"CLAS"] = data
elif tag == b"KTYP":
current_key[b"KTYP"] = data
elif tag == b"WRAP":
current_key[b"WRAP"] = data
elif tag == b"WPKY":
current_key[b"WPKY"] = data
elif tag in [b"DPSL", b"HMCK", b"SALT"]:
keybag_attrs[tag] = data.hex()
else:
keybag_attrs[tag] = data
# Close out the in-progress key, if we started one
_close_key_class(current_key)
current_key = None
self.keybag_attrs = keybag_attrs
self.parsed_keybag_keys = parsed_keybag_keys
def _compute_passcode_key(self, passcode_str):
passcode = passcode_str.encode('utf-8')
passcode_key_step1 = hashlib.pbkdf2_hmac('sha256',
passcode,
bytes.fromhex(self.keybag_attrs[b"DPSL"]),
self.keybag_attrs[b"DPIC"],
32)
passcode_key_step2 = hashlib.pbkdf2_hmac(
'sha1',
passcode_key_step1,
bytes.fromhex(self.keybag_attrs[b"SALT"]),
self.keybag_attrs[b"ITER"],
32)
return passcode_key_step2
def _try_unlock_keys(self, passcode_key):
# Names for the b"WRAP" field in BackupKeyBag
WRAP_DEVICE = 1
WRAP_PASSCODE = 2
successful_unwraps = 0
for key_class, key_dict in self.parsed_keybag_keys.items():
if b"WPKY" not in key_dict:
continue
assert key_dict[b"WRAP"] & WRAP_PASSCODE
unwrapped_key = _aes_unwrap(passcode_key, key_dict[b"WPKY"])
if not unwrapped_key:
if b"DPSL" not in self.keybag_attrs:
print(f"[WARN] \"DPSL\" not found in keybag, is the backup unencrypted?")
if b"DPIC" not in self.keybag_attrs:
print(f"[WARN] \"DPIC\" not found in keybag, is the backup unencrypted?")
# TODO: Should we stop trying to unwrap as soon as a decryption fails?
continue
key_dict[ManifestPlist.UNWRAPPED_KEY_ENTRY] = unwrapped_key
successful_unwraps += 1
return successful_unwraps
def unlock_keys(self, passcode_str: Optional[str]) -> Dict:
# Try existing keys first, since it's unlikely we really need a new one
passcode_keys_to_try = self.passcode_cacher.values()
for passcode_key in passcode_keys_to_try:
successful_unwraps = self._try_unlock_keys(passcode_key)
if successful_unwraps < 1:
continue
if passcode_str:
# Only store the passcodes and keys after we've successfully used them.
self.passcode_cacher.add_one(passcode_str, passcode_key)
self.passcode_cacher.save_to_keyring()
else:
print(f"[INFO] Successfully unwrapped with existing passcode_key: {passcode_key.hex()}")
# Succeeded here, exit
return self.parsed_keybag_keys
# If those didn't work, see if we maybe need to compute a new one
# This also handles the case where the algorithm changed (e.g. new SALT in the keybag)
if passcode_str:
passcode_key = self._compute_passcode_key(passcode_str)
successful_unwraps = self._try_unlock_keys(passcode_key)
if successful_unwraps > 0:
self.passcode_cacher.add_one(passcode_str, passcode_key)
self.passcode_cacher.save_to_keyring()
# TODO: Also try recomputing new passcode_keys based on old passcode_str's + new parameters
# Went through all known passcodes, couldn't figure anything out
raise ValueError("Failed to decrypt keybag, try a different passcode")
def _mp_encrypt(mdb, row, column_names, output_dir):
row_data = dict(zip(column_names, row))
# print(f"[DEBUG] Starting encrypt: {row_data['fileID']}")
return mdb._try_encrypt_file(row_data, output_dir)
class ManifestDb:
def __init__(self, backup_dir):
self.backup_dir = backup_dir
self.mdb_path = os.path.join(backup_dir, "Manifest.db")
self.mdb_conn = sqlite3.connect(self.mdb_path)
if not ManifestDb.is_manifest_db_decrypted(self.mdb_path):
raise ValueError(f"Couldn't open file: {self.mdb_path}")
# Used later; call self.set_protection_class_key() once ManifestPlist is unlocked.
self.unlocked_keybag_keys = {}
@staticmethod
def is_manifest_db_decrypted(mdb_path):
"""
Check that the given directory is a partially-decrypted backup.
Partially-decrypted means:
- filesystem files are in plaintext (file contents are actually readable with e.g. `file`)
- Manifest.db contents have not been updated (because decrypting means new backup keys)
- (optional) Manifest.plist still reports IsEncrypted, since it was copied directly from an encrypted dir
"""
# First check: is Manifest.db encrypted?
conn = sqlite3.connect(mdb_path)
cur = conn.cursor()
try:
cur.execute("SELECT fileID FROM Files LIMIT 1")
except sqlite3.DatabaseError:
print(f"[WARN] Can't access Manifest.db contents, treating as encrypted: {mdb_path}")
return False
cur.close()
conn.close()
return True
def dump_files_table(self, row_limit=None):
"""Pretty print the file contents, mostly useful for git diff of an iOS backup"""
cur = self.mdb_conn.cursor()
query_str = "SELECT * FROM Files ORDER BY fileID"
if row_limit:
query_str += f" LIMIT {row_limit}"
cur.execute(query_str)
column_names = [col[0] for col in cur.description]
for row in cur:
row_data = dict(zip(column_names, row))
if row_data['file']:
# prep to pretty-print the whole field
row_data['file'] = plistlib.loads(row_data['file'])
# prep what seems to be a file hash
objects_dict = row_data['file']['$objects']
if objects_dict[1]['$class'] == plistlib.UID(6):
if isinstance(objects_dict[3], (bytes,)):
objects_dict[3] = objects_dict[3].hex()
if objects_dict[1]['$class'] == plistlib.UID(4):
if isinstance(objects_dict[3], (bytes,)):
objects_dict[3] = objects_dict[3].hex()
pprint(row_data)
cur.close()
def set_protection_class_key(self, protection_class_id, key):
self.unlocked_keybag_keys[protection_class_id] = key
def unwrap(self, protection_class_id, wrapped_key):
if len(wrapped_key) != 0x28:
raise ValueError(f"[ERROR] Invalid key length: {len(wrapped_key)}")
return _aes_unwrap(self.unlocked_keybag_keys[protection_class_id], wrapped_key)
@staticmethod
def _just_encrypt_file(source_f, target_f, reencryptor, target_size, needs_padding=True):
# If source_f is an empty file, we actually still want to pad it with data.
if target_size == 0:
mmapped_source = []
# Mmap the decrypted file, for speed
elif os.name == 'nt':
mmapped_source = mmap.mmap(source_f.fileno(), length=0, access=mmap.ACCESS_READ)
else:
mmapped_source = mmap.mmap(source_f.fileno(), length=0, prot=mmap.PROT_READ)
# Write the reencrypted file to disk
current_chunk_index = 0
chunk_size = 16 * 1000 * 1000
while True: # current_chunk_index * chunk_size < target_size:
# print(f"[DEBUG] Encrypting file from byte {current_chunk_index * chunk_size}")
current_chunk = mmapped_source[
current_chunk_index * chunk_size:(current_chunk_index + 1) * chunk_size]
if len(current_chunk) == 0:
break
if len(current_chunk) % 16:
padding_size = 16 - (len(current_chunk) % 16)
current_chunk = current_chunk + padding_size.to_bytes(1, sys.byteorder) * padding_size
needs_padding = False
target_f.write(reencryptor.encrypt(current_chunk))
current_chunk_index += 1
if needs_padding:
padding_size = 16 - (len(current_chunk) % 16)
current_chunk = padding_size.to_bytes(1, sys.byteorder) * padding_size
target_f.write(reencryptor.encrypt(current_chunk))
def _try_encrypt_file(self, row_data, output_dir):
try:
file_manifest = NSKeyedUnArchiver.unserializeNSKeyedArchiver(row_data['file'])
except TypeError:
if not len(row_data['file']):
return
print(f"[ERROR] Couldn't parse Manifest.db data for {row_data['fileID']}")
return
# Confirm that we have a source file to decrypt
source_path = os.path.join(self.backup_dir, row_data['fileID'][0:2], row_data['fileID'])
if not os.path.exists(source_path):
return
# And that we have a target file to encrypt to
target_path = os.path.join(output_dir, row_data['fileID'][0:2], row_data['fileID'])
if 'Digest' in file_manifest and os.path.exists(target_path):
# Check if the output file was already created + identical
computed_hash = _recompute_file_hash(target_path)
recorded_hash = file_manifest['Digest']
if computed_hash == recorded_hash:
print(f"[DEBUG] Skipping encryption, target file already matches: {row_data['fileID']}, {file_manifest['Size']} bytes")
return
else:
print(f"[WARN] File already exists, but hash doesn't match: {row_data['fileID']}")
target_path_parent = os.path.dirname(target_path)
if not os.path.exists(target_path_parent):
pathlib.Path(target_path_parent).mkdir(parents=False, exist_ok=True)
# And collect the encryption key to decrypt with
file_encryption_key = file_manifest['EncryptionKey'][4:]
with open(source_path, 'rb') as source_f:
with open(target_path, 'wb') as target_f:
key = self.unwrap(file_manifest['ProtectionClass'], file_encryption_key)
reencryptor = AES.new(key, AES.MODE_CBC, b'\x00' * 16)
# print(f"[DEBUG] Encrypting fileID {row_data['fileID']} / {file_manifest['RelativePath']}, {file_manifest['Size']} bytes")
ManifestDb._just_encrypt_file(source_f, target_f, reencryptor, file_manifest['Size'])
# No hash to compute, or care about
if 'Digest' not in file_manifest:
return
computed_hash = _recompute_file_hash(target_path)
recorded_hash = file_manifest['Digest']
if computed_hash != recorded_hash:
row_data['file'] = file_manifest
print()
print(
f"[ERROR] Failed to re-encrypt fileID \"{row_data['fileID']}\":\n"
f" checksum of original file: {_recompute_file_hash(source_path).hex()}\n"
f" checksum of encrypted output: {computed_hash.hex()}\n"
f" checksum recorded in Manifest.db: {recorded_hash.hex()}")
print(
f"[DEBUG] Check if file sizes match for \"{file_manifest['RelativePath']}\":\n"
f" size of original, decrypted file: {os.path.getsize(source_path)}\n"
f" size of encrypted output file: {os.path.getsize(target_path)}\n"
f" size recorded in Manifest.db: {file_manifest['Size']}")
pprint(row_data)
raise ValueError(f"SHA1 checksums don't match, failed to re-encrypt")
def reencrypt_one(self, file_id, output_dir):
cur = self.mdb_conn.cursor()
cur.execute("SELECT * FROM Files WHERE fileID = ?", (file_id,))
column_names = [col[0] for col in cur.description]
for row in cur:
row_data = dict(zip(column_names, row))
self._try_encrypt_file(row_data, output_dir)
cur.close()
def reencrypt_all(self, output_dir):
cur = self.mdb_conn.cursor()
cur.execute("SELECT * FROM Files ORDER BY FileID")
column_names = [col[0] for col in cur.description]
failed_reencryptions = 0
succeeded_reencryptions = 0
for row in cur:
row_data = dict(zip(column_names, row))
try:
# print(f"[DEBUG] Trying to encrypt file: {row_data['fileID']}")
self._try_encrypt_file(row_data, output_dir)
succeeded_reencryptions += 1
except (ValueError, NotImplementedError,) as e:
failed_reencryptions += 1
print(f"[ERROR] {row_data['fileID']}: {e}")
if failed_reencryptions > 0:
print(f"[INFO] Failed to encrypt {failed_reencryptions} files (total {failed_reencryptions + succeeded_reencryptions})")
cur.close()
def reencrypt_threaded(self, output_dir, num_threads=3):
cur = self.mdb_conn.cursor()
# NB Hide the SQLite connection while we're doing multiprocessing.
# If we want to use the class again, set it after the futures have finished running.
self.mdb_conn = None
cur.execute("SELECT * FROM Files ORDER BY FileID")
column_names = [col[0] for col in cur.description]
import concurrent.futures
executor = concurrent.futures.ProcessPoolExecutor()
futures_list = []
for row in cur:
future = executor.submit(_mp_encrypt, self, row, column_names, output_dir)
futures_list.append(future)
for future in concurrent.futures.as_completed(futures_list):
if future.exception():
print(future.exception())
cur.close()
if __name__ == '__main__':
backup_dir = sys.argv[1]
output_dir = sys.argv[2]
passcode_str = None
if len(sys.argv) >= 4:
passcode_str = sys.argv[3]
print_debug_info = False
mdb = ManifestDb(backup_dir)
if print_debug_info:
print('=' * 72)
print('Dumping Manifest.db info')
print('=' * 72)
print()
mdb.dump_files_table(row_limit=2)
print('[WARN] truncating entries past limit')
print()
mpl = ManifestPlist(backup_dir)
mpl.parse_keybag()
if print_debug_info:
print('=' * 72)
print('Dumping Manifest.plist info')
print('=' * 72)
print()
pprint(mpl.keybag_attrs)
pprint(mpl.parsed_keybag_keys)
print()
unlocked_keys = mpl.unlock_keys(passcode_str)
for key_class, key_dict in unlocked_keys.items():
mdb.set_protection_class_key(key_class, key_dict[ManifestPlist.UNWRAPPED_KEY_ENTRY])
print('=' * 72)
print(f'Re-encrypting files into {output_dir}')
print('=' * 72)
print()
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
# mdb.reencrypt_all(output_dir)
mdb.reencrypt_threaded(output_dir)
print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment