Last active
June 25, 2024 07:17
-
-
Save R00tFS/e993a3cccb42904becef18d1c1502b8d to your computer and use it in GitHub Desktop.
Tool to parse keys for AEA in IPSWs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyhpke import CipherSuite, KEMId, KEMKey, KDFId, AEADId | |
import base64 | |
import requests | |
import argparse | |
import re | |
import fsspec | |
from zipfile import ZipFile | |
import json | |
from tabulate import tabulate | |
import io | |
# multithreading | |
from multiprocessing.pool import ThreadPool | |
AEA_PROFILE__HKDF_SHA256_AESCTR_HMAC__SYMMETRIC__NONE = 1 | |
suite = CipherSuite.new( | |
KEMId.DHKEM_P256_HKDF_SHA256, KDFId.HKDF_SHA256, AEADId.AES256_GCM | |
) | |
print_command = False | |
output = False | |
output_content = [] | |
keys = [] | |
def _print(content): | |
if output: | |
output_content.append(content) | |
print(content) | |
def parse_size(header): | |
if len(header) != 12: | |
print(f"Expected 12 bytes, got {len(header)}") | |
exit(1) | |
magic = header[:4] | |
if magic != b"AEA1": | |
print(f"Invalid magic: {magic.hex()}") | |
exit(1) | |
profile = int.from_bytes(header[4:7], "little") | |
if profile != AEA_PROFILE__HKDF_SHA256_AESCTR_HMAC__SYMMETRIC__NONE: | |
print(f"Invalid AEA profile: {profile}") | |
exit(1) | |
return int.from_bytes(header[8:12], "little") | |
def parse_fields(content, orig_size): | |
fields = {} | |
if len(content) != orig_size: | |
print(f"Expected {orig_size} bytes, got {len(content)}") | |
exit(1) | |
assert content[:4] | |
while len(content) > 0: | |
field_size = int.from_bytes(content[:4], "little") | |
field_blob = content[:field_size] | |
key_end = field_blob.index(b"\x00", 4) | |
key = field_blob[4:key_end].decode() | |
value = field_blob[key_end + 1 :].decode() | |
fields[key] = value | |
content = content[field_size:] | |
return fields | |
def get_aea_files(url: str): | |
file_content = [] | |
with fsspec.open(url) as f: | |
with ZipFile(f) as z: | |
for file in z.filelist: | |
if file.filename.endswith(".aea"): | |
with z.open(file.filename) as remote_file: | |
auth_blob_size = parse_size(remote_file.read(12)) | |
file_content.append({"filename": file.filename, "fields": parse_fields(remote_file.read(auth_blob_size), auth_blob_size)}) | |
return file_content | |
def get_final_key(enc_req, wrapped_key, pem): | |
privkey = KEMKey.from_pem(pem) | |
recipient = suite.create_recipient_context(base64.b64decode(enc_req), privkey) | |
pt = recipient.open(base64.b64decode(wrapped_key)) | |
return f"base64:{base64.b64encode(pt).decode("utf8")}" | |
def parse_ipsw_url(url): | |
ipsw_name = url.split("/")[-1] | |
infos = ipsw_name.split("_") | |
model_pattern = r'\d\.\d|\d{2}\.\d|\d{2}\.\d\.\d' | |
first_split = re.split(model_pattern, ipsw_name) | |
model = first_split[0].replace("_", " ") | |
final_split = ipsw_name.replace(first_split[0], "").split("_") | |
return {"model": model, "version": final_split[0], "build_number": final_split[1], "url": url} | |
def grab_keys_for_url(url): | |
infos = parse_ipsw_url(url) | |
infos["files"] = [] | |
#_print(parse_ipsw_url(url)) | |
files = get_aea_files(url) | |
table = [] | |
for aea in files: | |
fields = aea["fields"] | |
fcs_response = json.loads(fields["com.apple.wkms.fcs-response"]) | |
enc_req = fcs_response["enc-request"] | |
wrapped_key = fcs_response["wrapped-key"] | |
url = fields["com.apple.wkms.fcs-key-url"] | |
pem = requests.get(url).content | |
final_key = get_final_key(enc_req, wrapped_key, pem) | |
infos["files"].append({"file": aea["filename"], "key": final_key}) | |
#if print_command: _print(f"Command to decrypt : aea decrypt -i {aea["filename"]} -o {aea["filename"].strip(".aea")} -key-value {final_key}") | |
return infos | |
def multithread(url): | |
keys.append(grab_keys_for_url(url)) | |
def parse_url_file(path): | |
with open(path, "r") as f: | |
lines = [line.strip() for line in f.readlines() if line.strip()] | |
pool = ThreadPool(processes=10) | |
pool.map(multithread, lines) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-u", "--url", help="ipsw url", type=str) | |
parser.add_argument("-i", "--input", help="file with url list separated by return", type=str) | |
#parser.add_argument("-c", "--commands", action='store_true', help="prints the aea command to extract the file") | |
parser.add_argument("-o", "--output", help="Output keys to specified file") | |
args = parser.parse_args() | |
#print_command = args.commands | |
output = args.output != None | |
if args.url: | |
keys.append(grab_keys_for_url(args.url)) | |
elif args.input: | |
parse_url_file(args.input) | |
if output: | |
with open(args.output, "w") as f: | |
f.write(json.dumps(keys, indent=4)) | |
else: | |
for key in keys: | |
print(f"Model : {key["model"]} ; Version {key["build_number"]} ; Build Number: {key["build_number"]}") | |
table = [[aea["file"], aea["key"]] for aea in key["files"]] | |
print(tabulate(table, headers=["Apple Encrypted Archive", "Key"])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment