Skip to content

Instantly share code, notes, and snippets.

@pieceofsummer
Last active July 17, 2021 21:12
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save pieceofsummer/26dc10b4263945298407cdf2d55b9065 to your computer and use it in GitHub Desktop.
Save pieceofsummer/26dc10b4263945298407cdf2d55b9065 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from hashlib import pbkdf2_hmac, sha1
import os, tempfile, argparse
import struct, plistlib, sqlite3
from bpylist2 import archiver
class MBFile:
last_modified: int
protection_class: int
flags: int
group_id: int
last_status_change: int
relative_path: str
birth: int
encryption_key: archiver.NSMutableData
size: int
mode: int
user_id: int
digest: bytes
inode_number: int
def encode_archive(self, archive):
archive.encode('LastModified', self.last_modified)
archive.encode('ProtectionClass', self.protection_class)
archive.encode('Flags', self.flags)
archive.encode('GroupID', self.group_id)
archive.encode('LastStatusChange', self.last_status_change)
archive.encode('RelativePath', self.relative_path)
archive.encode('Birth', self.birth)
archive.encode('EncryptionKey', self.encryption_key)
archive.encode('Size', self.size)
archive.encode('Mode', self.mode)
archive.encode('UserID', self.user_id)
archive.encode('Digest', self.digest)
archive.encode('InodeNumber', self.inode_number)
def decode_archive(archive):
file = MBFile()
file.last_modified = archive.decode('LastModified')
file.protection_class = archive.decode('ProtectionClass')
file.flags = archive.decode('Flags')
file.group_id = archive.decode('GroupID')
file.last_status_change = archive.decode('LastStatusChange')
file.relative_path = archive.decode('RelativePath')
file.birth = archive.decode('Birth')
file.encryption_key = archive.decode('EncryptionKey')
file.size = archive.decode('Size')
file.mode = archive.decode('Mode')
file.user_id = archive.decode('UserID')
file.digest = archive.decode('Digest')
file.inode_number = archive.decode('InodeNumber')
return file
archiver.update_class_map({ 'MBFile': MBFile })
def AESUnwrap(kek, wrapped, iv=0xa6a6a6a6a6a6a6a6):
QUAD = struct.Struct('>Q')
n = len(wrapped)//8 - 1
R = [None]+[wrapped[i*8:i*8+8] for i in range(1, n+1)]
A, = QUAD.unpack(wrapped[:8])
decrypt = AES.new(kek, AES.MODE_ECB).decrypt
for j in range(5,-1,-1): #counting down
for i in range(n, 0, -1): #(n, n-1, ..., 1)
ciphertext = QUAD.pack(A^(n*j+i)) + R[i]
B = decrypt(ciphertext)
A, = QUAD.unpack(B[:8])
R[i] = B[8:]
assert A == iv, 'Invalid unwrap'
return b''.join(R[1:])
class KeyBagKey:
_FIELD_MAP = {
'UUID': 'uuid',
'CLAS': 'protection_class',
'KTYP': 'type',
'WRAP': 'wrap',
'WPKY': 'wrapped_key'
}
def __init__(self, uuid):
self.uuid = uuid
self.protection_class = None
self.type = None
self.wrap = None
self.wrapped_key = None
self.key = None
def __setitem__(self, key, value):
setattr(self, self._FIELD_MAP.get(key, key), value)
def __getitem__(self, key):
return getattr(self, self._FIELD_MAP.get(key, key), None)
def unwrap(self, wrapped):
assert self.key is not None, 'Key is not unlocked'
return AESUnwrap(self.key, wrapped)
class KeyBag:
_FIELD_MAP = {
'VERS': 'version',
'TYPE': 'type'
}
WRAP_DEVICE = 1
WRAP_PASSCODE = 2
def __init__(self, data):
self.version = None
self.type = None
self.keys = []
obj = self
for tag, value in self._TLVBlocks(data):
if tag == 'UUID':
obj = KeyBagKey(value)
self.keys.append(obj)
else:
obj[tag] = value
def __setitem__(self, key, value):
setattr(self, self._FIELD_MAP.get(key, key), value)
def __getitem__(self, key):
return getattr(self, self._FIELD_MAP.get(key, key))
@staticmethod
def _TLVBlocks(blob):
i = 0
while i + 8 <= len(blob):
tag, length = struct.unpack_from(">4sI", blob, i)
i += 8
value = blob[i:i+length]
if length == 4:
value, = struct.unpack('>I', value)
yield tag.decode('ascii'), value
i += length
def unlock(self, passcode):
if isinstance(passcode, str):
passcode = passcode.encode()
key0 = self.keys[0]
assert key0.wrap == 0
if key0['DPWT'] == 1:
passcode = pbkdf2_hmac('sha256', passcode, key0.DPSL, key0.DPIC, 32)
passcode_key = pbkdf2_hmac('sha1', passcode, key0.SALT, key0.ITER, 32)
for key in self.keys:
if key.wrap & self.WRAP_PASSCODE:
try:
key.key = AESUnwrap(passcode_key, key.wrapped_key)
except:
return False
return True
def find_key(self, protection_class):
assert 1 <= protection_class <= 11, f'Protection class {protection_class} is invalid!'
for key in self.keys:
if key.protection_class == protection_class:
return key
return None
def unwrap_key(self, protection_class, wrapped_key):
assert len(wrapped_key) == 40, 'Invalid wrapped_key length'
key = self.find_key(protection_class)
assert key is not None, f'No key found for protection class {protection_class}'
return key.unwrap(wrapped_key)
def __str__(self):
return f'KeyBag(version = {self.version}, type = {self.type}, keys = {self.keys})'
def load_manifest(dirname: str) -> dict:
with open(os.path.join(dirname, 'Manifest.plist'), 'rb') as f:
manifest = plistlib.load(f)
assert 'Version' in manifest
return manifest
class BackupFile:
def __init__(self, manifest_db, file_id: str, plist: bytes):
self.manifest_db = manifest_db
self.file_id = file_id
self.info = archiver.unarchive(plist)
self.encryption_key = None
self.filename = os.path.join(self.manifest_db.root_dir, file_id[:2], file_id)
assert os.path.exists(self.filename), self.filename
if self.manifest_db.manifest['IsEncrypted']:
protection_class, wrapped_key = struct.unpack('<I40s', self.info.encryption_key.NSdata)
assert protection_class == self.info.protection_class, 'Protection class mismatch'
self.encryption_key = self.manifest_db.keybag.unwrap_key(protection_class, wrapped_key)
def read(self):
with open(self.filename, 'rb') as f:
data = f.read()
assert sha1(data).digest() == self.info.digest, 'Invalid file digest'
if self.encryption_key:
cipher = AES.new(self.encryption_key, AES.MODE_CBC, b'\0'*16)
data = unpad(cipher.decrypt(data), 16)
assert len(data) == self.info.size, 'Invalid file size'
return data
def write(self, data):
assert isinstance(data, bytes)
self.info.size = len(data)
if self.encryption_key:
cipher = AES.new(self.encryption_key, AES.MODE_CBC, b'\0'*16)
data = cipher.encrypt(pad(data, 16))
self.info.digest = sha1(data).digest()
self.manifest_db._update(self.file_id, archiver.archive(self.info))
with open(self.filename, 'wb') as f:
f.write(data)
self.manifest_db._commit()
class ManifestDB:
def __init__(self, root_dir: str, manifest: dict, keybag: KeyBag):
self.root_dir = root_dir
self.filename = os.path.join(root_dir, 'Manifest.db')
self.manifest = manifest
self.keybag = keybag
self.modified = False
self.tempname = tempfile.mktemp()
self.conn = None
self.manifest_key = None
if manifest['IsEncrypted'] and 'ManifestKey' in manifest:
self.manifest_key = keybag.unwrap_key(*struct.unpack('<I40s', manifest['ManifestKey']))
assert os.access(self.filename, os.W_OK), 'Manifest.db is read-only'
print('[*] Loading Manifest.db...')
with open(self.filename, 'rb') as f:
data = f.read()
if self.manifest_key:
cipher = AES.new(self.manifest_key, AES.MODE_CBC, b'\0'*16)
data = unpad(cipher.decrypt(data), 16)
with open(self.tempname, 'wb') as f:
f.write(data)
self.conn = sqlite3.connect(self.tempname)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def close(self):
self.conn.close()
if self.modified:
print('[*] Saving Manifest.db...')
with open(self.tempname, 'rb') as f:
data = f.read()
if self.manifest_key:
cipher = AES.new(self.manifest_key, AES.MODE_CBC, b'\0'*16)
data = cipher.encrypt(pad(data, 16))
with open(self.filename, 'wb') as f:
f.write(data)
self.modified = False
try:
os.unlink(self.tempname)
except:
pass
def file(self, domain: str, relative_path: str) -> BackupFile:
result = self.conn.execute(
'select fileID, file from Files where domain = ? and relativePath = ?',
(domain, relative_path)
).fetchone()
if not result:
raise FileNotFoundError(f'{domain}/{relative_path}')
return BackupFile(self, *result)
def _update(self, file_id: str, plist: bytes):
self.conn.execute('update Files set file = ? where fileID = ?', (plist, file_id))
def _commit(self):
self.conn.commit()
self.modified = True
parser = argparse.ArgumentParser(description='Removes bad SSIDs from known networks list.')
parser.add_argument('backup_path', type=str, help='path to a backup directory')
parser.add_argument('password', type=str, nargs='?', help='password for encrypted backup')
args = parser.parse_args()
assert os.path.isdir(args.backup_path), 'Specified path is valid'
print('[*] Loading backup manifest...')
manifest = load_manifest(args.backup_path)
keybag = None
if manifest['IsEncrypted']:
assert args.password, 'Password is required for encrypted backup'
keybag = KeyBag(manifest['BackupKeyBag'])
assert keybag.type == 1, 'Not a backup key bag'
if not keybag.unlock(args.password):
print('[!] Invalid password')
exit()
with ManifestDB(args.backup_path, manifest, keybag) as backup:
print('[*] Loading com.apple.wifi.known-networks.plist...')
known_networks = backup.file('SystemPreferencesDomain', 'com.apple.wifi.known-networks.plist')
networks = plistlib.loads(known_networks.read())
print(f'[+] Loaded {len(networks)} known networks')
modified = False
for key, network in list(networks.items()):
if b'%' in network['SSID']:
print('[+] Removing bad SSID %s' % network['SSID'].decode())
networks.pop(key)
modified = True
if modified:
known_networks.write(plistlib.dumps(networks, fmt=plistlib.FMT_BINARY, sort_keys=False))
else:
print('[-] No bad SSIDs found')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment