Last active
July 17, 2021 21:12
-
-
Save pieceofsummer/26dc10b4263945298407cdf2d55b9065 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from Crypto.Cipher import AES | |
from Crypto.Util.Padding import pad, unpad | |
from hashlib import pbkdf2_hmac, sha1 | |
import os, tempfile, argparse | |
import struct, plistlib, sqlite3 | |
from bpylist2 import archiver | |
class MBFile: | |
last_modified: int | |
protection_class: int | |
flags: int | |
group_id: int | |
last_status_change: int | |
relative_path: str | |
birth: int | |
encryption_key: archiver.NSMutableData | |
size: int | |
mode: int | |
user_id: int | |
digest: bytes | |
inode_number: int | |
def encode_archive(self, archive): | |
archive.encode('LastModified', self.last_modified) | |
archive.encode('ProtectionClass', self.protection_class) | |
archive.encode('Flags', self.flags) | |
archive.encode('GroupID', self.group_id) | |
archive.encode('LastStatusChange', self.last_status_change) | |
archive.encode('RelativePath', self.relative_path) | |
archive.encode('Birth', self.birth) | |
archive.encode('EncryptionKey', self.encryption_key) | |
archive.encode('Size', self.size) | |
archive.encode('Mode', self.mode) | |
archive.encode('UserID', self.user_id) | |
archive.encode('Digest', self.digest) | |
archive.encode('InodeNumber', self.inode_number) | |
def decode_archive(archive): | |
file = MBFile() | |
file.last_modified = archive.decode('LastModified') | |
file.protection_class = archive.decode('ProtectionClass') | |
file.flags = archive.decode('Flags') | |
file.group_id = archive.decode('GroupID') | |
file.last_status_change = archive.decode('LastStatusChange') | |
file.relative_path = archive.decode('RelativePath') | |
file.birth = archive.decode('Birth') | |
file.encryption_key = archive.decode('EncryptionKey') | |
file.size = archive.decode('Size') | |
file.mode = archive.decode('Mode') | |
file.user_id = archive.decode('UserID') | |
file.digest = archive.decode('Digest') | |
file.inode_number = archive.decode('InodeNumber') | |
return file | |
archiver.update_class_map({ 'MBFile': MBFile }) | |
def AESUnwrap(kek, wrapped, iv=0xa6a6a6a6a6a6a6a6): | |
QUAD = struct.Struct('>Q') | |
n = len(wrapped)//8 - 1 | |
R = [None]+[wrapped[i*8:i*8+8] for i in range(1, n+1)] | |
A, = QUAD.unpack(wrapped[:8]) | |
decrypt = AES.new(kek, AES.MODE_ECB).decrypt | |
for j in range(5,-1,-1): #counting down | |
for i in range(n, 0, -1): #(n, n-1, ..., 1) | |
ciphertext = QUAD.pack(A^(n*j+i)) + R[i] | |
B = decrypt(ciphertext) | |
A, = QUAD.unpack(B[:8]) | |
R[i] = B[8:] | |
assert A == iv, 'Invalid unwrap' | |
return b''.join(R[1:]) | |
class KeyBagKey: | |
_FIELD_MAP = { | |
'UUID': 'uuid', | |
'CLAS': 'protection_class', | |
'KTYP': 'type', | |
'WRAP': 'wrap', | |
'WPKY': 'wrapped_key' | |
} | |
def __init__(self, uuid): | |
self.uuid = uuid | |
self.protection_class = None | |
self.type = None | |
self.wrap = None | |
self.wrapped_key = None | |
self.key = None | |
def __setitem__(self, key, value): | |
setattr(self, self._FIELD_MAP.get(key, key), value) | |
def __getitem__(self, key): | |
return getattr(self, self._FIELD_MAP.get(key, key), None) | |
def unwrap(self, wrapped): | |
assert self.key is not None, 'Key is not unlocked' | |
return AESUnwrap(self.key, wrapped) | |
class KeyBag: | |
_FIELD_MAP = { | |
'VERS': 'version', | |
'TYPE': 'type' | |
} | |
WRAP_DEVICE = 1 | |
WRAP_PASSCODE = 2 | |
def __init__(self, data): | |
self.version = None | |
self.type = None | |
self.keys = [] | |
obj = self | |
for tag, value in self._TLVBlocks(data): | |
if tag == 'UUID': | |
obj = KeyBagKey(value) | |
self.keys.append(obj) | |
else: | |
obj[tag] = value | |
def __setitem__(self, key, value): | |
setattr(self, self._FIELD_MAP.get(key, key), value) | |
def __getitem__(self, key): | |
return getattr(self, self._FIELD_MAP.get(key, key)) | |
@staticmethod | |
def _TLVBlocks(blob): | |
i = 0 | |
while i + 8 <= len(blob): | |
tag, length = struct.unpack_from(">4sI", blob, i) | |
i += 8 | |
value = blob[i:i+length] | |
if length == 4: | |
value, = struct.unpack('>I', value) | |
yield tag.decode('ascii'), value | |
i += length | |
def unlock(self, passcode): | |
if isinstance(passcode, str): | |
passcode = passcode.encode() | |
key0 = self.keys[0] | |
assert key0.wrap == 0 | |
if key0['DPWT'] == 1: | |
passcode = pbkdf2_hmac('sha256', passcode, key0.DPSL, key0.DPIC, 32) | |
passcode_key = pbkdf2_hmac('sha1', passcode, key0.SALT, key0.ITER, 32) | |
for key in self.keys: | |
if key.wrap & self.WRAP_PASSCODE: | |
try: | |
key.key = AESUnwrap(passcode_key, key.wrapped_key) | |
except: | |
return False | |
return True | |
def find_key(self, protection_class): | |
assert 1 <= protection_class <= 11, f'Protection class {protection_class} is invalid!' | |
for key in self.keys: | |
if key.protection_class == protection_class: | |
return key | |
return None | |
def unwrap_key(self, protection_class, wrapped_key): | |
assert len(wrapped_key) == 40, 'Invalid wrapped_key length' | |
key = self.find_key(protection_class) | |
assert key is not None, f'No key found for protection class {protection_class}' | |
return key.unwrap(wrapped_key) | |
def __str__(self): | |
return f'KeyBag(version = {self.version}, type = {self.type}, keys = {self.keys})' | |
def load_manifest(dirname: str) -> dict: | |
with open(os.path.join(dirname, 'Manifest.plist'), 'rb') as f: | |
manifest = plistlib.load(f) | |
assert 'Version' in manifest | |
return manifest | |
class BackupFile: | |
def __init__(self, manifest_db, file_id: str, plist: bytes): | |
self.manifest_db = manifest_db | |
self.file_id = file_id | |
self.info = archiver.unarchive(plist) | |
self.encryption_key = None | |
self.filename = os.path.join(self.manifest_db.root_dir, file_id[:2], file_id) | |
assert os.path.exists(self.filename), self.filename | |
if self.manifest_db.manifest['IsEncrypted']: | |
protection_class, wrapped_key = struct.unpack('<I40s', self.info.encryption_key.NSdata) | |
assert protection_class == self.info.protection_class, 'Protection class mismatch' | |
self.encryption_key = self.manifest_db.keybag.unwrap_key(protection_class, wrapped_key) | |
def read(self): | |
with open(self.filename, 'rb') as f: | |
data = f.read() | |
assert sha1(data).digest() == self.info.digest, 'Invalid file digest' | |
if self.encryption_key: | |
cipher = AES.new(self.encryption_key, AES.MODE_CBC, b'\0'*16) | |
data = unpad(cipher.decrypt(data), 16) | |
assert len(data) == self.info.size, 'Invalid file size' | |
return data | |
def write(self, data): | |
assert isinstance(data, bytes) | |
self.info.size = len(data) | |
if self.encryption_key: | |
cipher = AES.new(self.encryption_key, AES.MODE_CBC, b'\0'*16) | |
data = cipher.encrypt(pad(data, 16)) | |
self.info.digest = sha1(data).digest() | |
self.manifest_db._update(self.file_id, archiver.archive(self.info)) | |
with open(self.filename, 'wb') as f: | |
f.write(data) | |
self.manifest_db._commit() | |
class ManifestDB: | |
def __init__(self, root_dir: str, manifest: dict, keybag: KeyBag): | |
self.root_dir = root_dir | |
self.filename = os.path.join(root_dir, 'Manifest.db') | |
self.manifest = manifest | |
self.keybag = keybag | |
self.modified = False | |
self.tempname = tempfile.mktemp() | |
self.conn = None | |
self.manifest_key = None | |
if manifest['IsEncrypted'] and 'ManifestKey' in manifest: | |
self.manifest_key = keybag.unwrap_key(*struct.unpack('<I40s', manifest['ManifestKey'])) | |
assert os.access(self.filename, os.W_OK), 'Manifest.db is read-only' | |
print('[*] Loading Manifest.db...') | |
with open(self.filename, 'rb') as f: | |
data = f.read() | |
if self.manifest_key: | |
cipher = AES.new(self.manifest_key, AES.MODE_CBC, b'\0'*16) | |
data = unpad(cipher.decrypt(data), 16) | |
with open(self.tempname, 'wb') as f: | |
f.write(data) | |
self.conn = sqlite3.connect(self.tempname) | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.close() | |
def close(self): | |
self.conn.close() | |
if self.modified: | |
print('[*] Saving Manifest.db...') | |
with open(self.tempname, 'rb') as f: | |
data = f.read() | |
if self.manifest_key: | |
cipher = AES.new(self.manifest_key, AES.MODE_CBC, b'\0'*16) | |
data = cipher.encrypt(pad(data, 16)) | |
with open(self.filename, 'wb') as f: | |
f.write(data) | |
self.modified = False | |
try: | |
os.unlink(self.tempname) | |
except: | |
pass | |
def file(self, domain: str, relative_path: str) -> BackupFile: | |
result = self.conn.execute( | |
'select fileID, file from Files where domain = ? and relativePath = ?', | |
(domain, relative_path) | |
).fetchone() | |
if not result: | |
raise FileNotFoundError(f'{domain}/{relative_path}') | |
return BackupFile(self, *result) | |
def _update(self, file_id: str, plist: bytes): | |
self.conn.execute('update Files set file = ? where fileID = ?', (plist, file_id)) | |
def _commit(self): | |
self.conn.commit() | |
self.modified = True | |
parser = argparse.ArgumentParser(description='Removes bad SSIDs from known networks list.') | |
parser.add_argument('backup_path', type=str, help='path to a backup directory') | |
parser.add_argument('password', type=str, nargs='?', help='password for encrypted backup') | |
args = parser.parse_args() | |
assert os.path.isdir(args.backup_path), 'Specified path is valid' | |
print('[*] Loading backup manifest...') | |
manifest = load_manifest(args.backup_path) | |
keybag = None | |
if manifest['IsEncrypted']: | |
assert args.password, 'Password is required for encrypted backup' | |
keybag = KeyBag(manifest['BackupKeyBag']) | |
assert keybag.type == 1, 'Not a backup key bag' | |
if not keybag.unlock(args.password): | |
print('[!] Invalid password') | |
exit() | |
with ManifestDB(args.backup_path, manifest, keybag) as backup: | |
print('[*] Loading com.apple.wifi.known-networks.plist...') | |
known_networks = backup.file('SystemPreferencesDomain', 'com.apple.wifi.known-networks.plist') | |
networks = plistlib.loads(known_networks.read()) | |
print(f'[+] Loaded {len(networks)} known networks') | |
modified = False | |
for key, network in list(networks.items()): | |
if b'%' in network['SSID']: | |
print('[+] Removing bad SSID %s' % network['SSID'].decode()) | |
networks.pop(key) | |
modified = True | |
if modified: | |
known_networks.write(plistlib.dumps(networks, fmt=plistlib.FMT_BINARY, sort_keys=False)) | |
else: | |
print('[-] No bad SSIDs found') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment