Skip to content

Instantly share code, notes, and snippets.

@JJTech0130
Last active February 2, 2024 04:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JJTech0130/30940c45fb7b1de5c309dc4eb2874586 to your computer and use it in GitHub Desktop.
Save JJTech0130/30940c45fb7b1de5c309dc4eb2874586 to your computer and use it in GitHub Desktop.
import requests
from rich import print
from rich.progress import track, Progress
from rich.prompt import Prompt
from rich.console import Console
from rich.table import Table
import zipfile
import os
import threading
import io
import lzma
import json
APPLEDB_API = "https://api.appledb.dev/"
APPLEDB_RAW = "https://raw.githubusercontent.com/littlebyteorg/appledb/main/"
CACHE_PATH = "cache/"
def download_fast(url: str) -> bytes:
if not os.path.exists(CACHE_PATH):
os.makedirs(CACHE_PATH)
cache_file = CACHE_PATH + url.split("/")[-1]
if os.path.exists(cache_file):
with open(cache_file, "rb") as f:
return f.read()
else:
# Create progress bar
progress = Progress()
chunk_size = 1024 * 1024 * 100
total_size = int(requests.head(url).headers["Content-Length"])
number_of_chunks = total_size // chunk_size + 1
task = progress.add_task("Downloading", total=number_of_chunks)
def download(url,start):
if start + chunk_size > total_size:
c = total_size - start - 1
else:
c = chunk_size - 1
#print(f"Downloading {start}-{start+c} of {total_size}")
req = requests.get(url, headers={'Range': f'bytes={start}-{start+c}'}, stream=True)
#req = urllib2.Request('http://www.python.org/')
#req.headers['Range'] = 'bytes=%s-%s' % (start, start+chunk_size)
##parts[start] = f.read()
parts[start] = req.content
#print(f"Finished downloading {start}-{start+c} of {total_size}")
progress.advance(task)
threads = []
parts = {}
# Initialize threads
for i in range(0,number_of_chunks):
t = threading.Thread(target=download, args=(url,i*chunk_size))
t.start()
threads.append(t)
progress.start()
# Join threads back (order doesn't matter, you just want them all)
for i in threads:
i.join()
progress.stop()
#print(parts)
print(sorted(parts.keys()))
# Sort parts and you're done
result = b''.join(parts[i] for i in sorted(parts.keys()))
with open(cache_file, "wb") as f:
f.write(result)
return result
def main():
console = Console()
# Ask for desired device type
device = Prompt.ask(":mobile_phone: Device type", console=console, default="iPhone5,1")
# Make sure the device is a real device using the AppleDB API
device_info = requests.get(APPLEDB_API + f"device/{device}.json")
if device_info.status_code != 200:
print("Device not found.")
return
# Get giant build list
builds = requests.get(APPLEDB_API + "ios/iOS/main.json").json()
# Check if [device].json exists
if not os.path.exists(CACHE_PATH + f"{device}.json"):
ota_builds = []
builds = [build for build in builds if build["osType"] == "iOS" and device in build["deviceMap"]]
for build in track(builds, description="Checking builds for OTA updates"):
# Use raw API to check for full OTA
build_raw_resp = requests.get(APPLEDB_RAW + f"osFiles/iOS/{build['build'][:2]}x%20-%20{build['version'].split('.')[0]}.x/{build['build']}.json")
if build_raw_resp.status_code != 200:
continue
build_raw = build_raw_resp.json()
if not "sources" in build_raw:
continue
ota_sources = [source for source in build_raw["sources"] if device in source["deviceMap"] and source["type"] == "ota" and "prerequisiteBuild" not in source]
if len(ota_sources) > 0:
link = [link for link in ota_sources[0]["links"] if link["active"] == True][0]
ota_builds.append((build["build"], build["version"], link["url"]))
with open(CACHE_PATH + f"{device}.json", "w") as f:
f.write(json.dumps(ota_builds))
else:
with open(CACHE_PATH + f"{device}.json", "r") as f:
ota_builds = json.loads(f.read())
print(f"Found {len(ota_builds)} full OTA updates for {device}")
table = Table(title="OTA Updates")
table.add_column("Build")
table.add_column("Version")
table.add_column("URL")
for build in ota_builds:
table.add_row(*build)
console.print(table)
# # Ask for desired build
build_name = Prompt.ask(":package: Build", console=console, choices=[build[0] for build in ota_builds], default=ota_builds[0][0], show_choices=False)
#build_name = "test"
build_url = [build for build in ota_builds if build[0] == build_name][0][2]
#build_url = "https://secure-appldnld.apple.com/ios10.2/031-94086-20161207-F7C9D786-BBFA-11E6-9985-663182FDB0CC/com_apple_MobileAsset_SoftwareUpdate/b78bd2674b358c518f8b0703df9047c88ff7218a.zip"
build_file = download_fast(build_url)
z = zipfile.ZipFile(io.BytesIO(build_file))
if "AssetData/payloadv2/payload" in z.namelist():
print("Found payloadv2")
print("Payload offset in ZIP: " + hex(z.getinfo("AssetData/payloadv2/payload").header_offset + len(z.getinfo("AssetData/payloadv2/payload").FileHeader()) + 4)) # Not sure what the 4 is for but it makes it line up
payload = z.read("AssetData/payloadv2/payload")
#print(payload[:10])
payload = io.BytesIO(payload)
# Parse pbzx header
assert payload.read(4) == b"pbzx"
chunk_size = int.from_bytes(payload.read(8), "big")
#print(f"Chunk size: {hex(chunk_size)}")
chunks = []
# Parse the chunks
while True:
offset = payload.tell()
decompressed_length = int.from_bytes(payload.read(8), "big")
if decompressed_length == 0:
break
length = int.from_bytes(payload.read(8), "big")
payload.read(length) # Skip the compressed data
chunks.append((offset, length))
# Check if we have cached the decompressed archive
cache_file = CACHE_PATH + build_url.split("/")[-1] + ".decompressed"
if os.path.exists(cache_file):
with open(cache_file, "rb") as f:
archive = f.read()
else:
archive = b""
for (offset, length) in track(chunks, description="Decompressing chunks"):
payload.seek(offset + 16) # Skip the header (16 bytes)
chunk = lzma.decompress(payload.read(length))
archive += chunk
with open(cache_file, "wb") as f:
f.write(archive)
print(archive[:10])
archive = io.BytesIO(archive)
# Parse the files
files = []
while True:
archive.read(6)
length = int.from_bytes(archive.read(4), "big")
#print(f"File offset: {hex(offset)}")
#print(f"File length: {hex(length)}")
archive.read(12)
name_length = int.from_bytes(archive.read(2), "big")
archive.read(6)
name = archive.read(name_length).decode()
#print(f"File name: {name}")
offset = archive.tell()
archive.read(length)
files.append((name, offset, length))
if archive.tell() == archive.getbuffer().nbytes:
break
# Ask if we should write available files to disk
from rich.prompt import Confirm
if Confirm.ask("Write list of files to disk?", default=False):
with open(f"{build_name}_files.txt", "w") as f:
for (name, offset, length) in files:
f.write(f"{name} @ {hex(offset)} : {hex(length)}\n")
# Ask for desired file
file_name = Prompt.ask(":open_file_folder: File", console=console, choices=[file[0] for file in files], default="System/Library/PrivateFrameworks/IDS.framework/identityservicesd.app/identityservicesd", show_choices=False)
file = [file for file in files if file[0] == file_name][0]
print(file)
# Calculate chunk number and offset
start_chunk = file[1] // chunk_size # Chunk number, file may start partway into chunk due to floor division
# Look up the offset for that chunk
chunk_offset = chunks[start_chunk][0]
# Calculate the offset into the chunk
file_offset = file[1] % chunk_size
# Check if the file is split across multiple chunks
if file_offset + file[2] > chunk_size:
print("File is split across multiple chunks")
print(f"Chunk number: {start_chunk}")
print(f"Chunk offset: {hex(chunk_offset + 16)}") # Remove the 16 byte header
print(f"Chunk length: {hex(chunks[start_chunk][1])}") # Note header is not removed, I think this overreads?
print(f"File offset: {hex(file_offset)}")
print(f"File length: {hex(file[2])}")
archive.seek(file[1])
print(f"file sanity check: {archive.read(10)}")
if __name__ == "__main__":
main()
import lzma
import sys
import requests
session = requests.Session()
URL = "http://appldnld.apple.com/ios10.3.3/091-23111-20170719-B3106482-697B-11E7-BDE5-4C9500BA0AE3/com_apple_MobileAsset_SoftwareUpdate/9e1777da1d4a8d6917de9cfa115f253aab6098c7.zip"
PBZ_IN_ZIP_OFFSET = 0x5a408fa
PBZ_CHUNK_OFFSET = 0x3fe01da4
PBZ_CHUNK_LENGTH = 0x35fa94
FILE_IN_CHUNK_OFFSET = 0x15110f
FILE_LENGTH = 0x5c3940
req_begin = PBZ_IN_ZIP_OFFSET + PBZ_CHUNK_OFFSET
req_end = req_begin + PBZ_CHUNK_LENGTH
resp = requests.get(URL, headers={
"Range": f"bytes={req_begin}-{req_end-1}"
}, stream=True)
if resp.status_code != 206:
print("Request failed")
sys.exit(1)
# this reads the whole chunk into memory,
# but it's only 16MB so meh
raw_chunk = lzma.decompress(resp.content, lzma.FORMAT_XZ)
with open("identityservicesd", "wb") as fp:
fp.write(raw_chunk[FILE_IN_CHUNK_OFFSET : FILE_IN_CHUNK_OFFSET+FILE_LENGTH])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment