Skip to content

Instantly share code, notes, and snippets.

@udf
Created December 5, 2023 13:24
Show Gist options
  • Save udf/62c24984b89de206fc7ae725cbb9b738 to your computer and use it in GitHub Desktop.
Save udf/62c24984b89de206fc7ae725cbb9b738 to your computer and use it in GitHub Desktop.
# Code to recursively extract a directory from a damaged exFAT filesystem
from collections import OrderedDict
from io import BytesIO
import os
from pathlib import Path
from dissect.fat.c_exfat import (
DIR_ENTRY_SIZE,
FILE_ENTRY,
c_exfat,
)
from dissect.fat.exfat import RootDirectory
_construct_filename = RootDirectory._construct_filename
IMAGE_HANDLE = open('/home/sam/vita.img', 'rb')
DATA_START = 0x43F0000
CLUSTER_SIZE = 0x8000
FIRST_VALID_CLUSTER = 499394
def cluster_to_address(cluster_no):
return DATA_START + cluster_no * CLUSTER_SIZE
def address_to_cluster(address):
cluster_no, rem = divmod(address - DATA_START, CLUSTER_SIZE)
if rem != 0:
raise ValueError(f'{address=:X} is not at a cluster boundary!')
return cluster_no
def read_cluster(cluster_no, length=CLUSTER_SIZE):
address = cluster_to_address(cluster_no)
IMAGE_HANDLE.seek(address)
return IMAGE_HANDLE.read(length)
def iter_clusters(start_cluster_no=0, window_size=1):
read_len = window_size * CLUSTER_SIZE
for i in range(start_cluster_no, NUM_CLUSTERS):
yield i, read_cluster(i, read_len)
def parse_file_entries(cluster_no, record_size=CLUSTER_SIZE):
if cluster_no < FIRST_VALID_CLUSTER:
print(f'warning: tried to read missing records from {cluster_no=}')
return OrderedDict()
data = read_cluster(cluster_no, record_size)
return _parse_file_entries(data)
# adapted from https://github.com/fox-it/dissect.fat/blob/b9c8dbe0f81c3377022c5bfe49f1099faba94dc3/dissect/fat/exfat.py#L248
def _parse_file_entries(data):
entries = OrderedDict()
data_len = len(data)
fh = BytesIO(data)
try:
while fh.tell() < data_len:
entry = c_exfat.FILE_DIRECTORY_ENTRY(fh.read(DIR_ENTRY_SIZE))
if entry.entry_type == FILE_ENTRY: # Not parsing any other types
# Entry is a file so we reuse it
metadata = entry
# -1 because the metadata entry includes the stream dir entry in its count
fnentry_count = metadata.subentry_count - 1
stream = c_exfat.STREAM_DIRECTORY_ENTRY(fh.read(DIR_ENTRY_SIZE))
fn_entries = []
for _ in range(fnentry_count):
fn_entries.append(c_exfat.FILENAME_DIRECTORY_ENTRY(fh.read(DIR_ENTRY_SIZE)))
file_ = c_exfat.FILE(metadata=metadata, stream=stream, fn_entries=fn_entries)
filename = _construct_filename(file_.fn_entries)
entries[filename] = file_
except EOFError:
pass
return entries
def recursive_extract(cluster_no, path, record_size=CLUSTER_SIZE):
if isinstance(path, str):
path = Path(path)
entries = parse_file_entries(cluster_no, record_size)
for filename, file_ in entries.items():
new_path = path / filename
file_addr = cluster_to_address(file_.stream.location)
file_len = file_.stream.data_length
if not file_.stream.flags.not_fragmented:
# Can't assemble fragmented files without FAT
print(
f'warning: skipping fragmented file {new_path} '
f'({file_len} bytes at 0x{file_addr:X})'
)
continue
if file_.metadata.attributes.directory:
recursive_extract(
file_.stream.location,
path=new_path,
record_size=file_len
)
continue
print('writing', new_path, file_len)
path.mkdir(parents=True, exist_ok=True)
IMAGE_HANDLE.seek(file_addr)
with open(new_path, 'wb') as f:
f.write(IMAGE_HANDLE.read(file_len))
IMAGE_HANDLE.seek(0, os.SEEK_END)
NUM_CLUSTERS = address_to_cluster(IMAGE_HANDLE.tell())
IMAGE_HANDLE.seek(0, os.SEEK_CUR)
if __name__ == '__main__':
# FFX
recursive_extract(5463310, 'PCSB00395')
# sdslot.dat fragmented (looks somewhat contiguous, 4 + NULL + 5)
# sure???
with open('PCSB00395/sce_sys/sdslot.dat', 'wb') as f:
IMAGE_HANDLE.seek(0x29B2CD8000)
f.write(IMAGE_HANDLE.read(CLUSTER_SIZE * 4))
IMAGE_HANDLE.seek(CLUSTER_SIZE, os.SEEK_CUR)
f.write(IMAGE_HANDLE.read(CLUSTER_SIZE * 5))
# Cold steel (some folders)
recursive_extract(address_to_cluster(0x16C2090000), 'PCSB01016/sce_pfs')
recursive_extract(address_to_cluster(0x16D9EE8000), 'PCSB01016/sce_sys')
# Tries every candidate block to find the second half of a missing thumbnail block
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import shutil
import tempfile
import subprocess
from bitarray import bitarray
import extract
workdir = tempfile.mkdtemp()
srcdir = str(Path(workdir) / 'src')
shutil.copytree('PCSB01016', srcdir, dirs_exist_ok=True)
print(f'{workdir=}')
initial_data = extract.read_cluster(5463311)
def try_data(data, cluster_no):
tmpdir = tempfile.mkdtemp(dir=workdir)
shutil.copytree(srcdir, tmpdir, dirs_exist_ok=True)
with open(Path(tmpdir) / f'5463311+{cluster_no}.64k', 'wb') as f:
f.write(initial_data + data)
p = subprocess.check_output(
[
'/home/sam/proj/p/vitarec/psvpfstools/cmake/output/Release/psvpfsparser',
'-i', tmpdir,
'-o', tmpdir + '_out'
]
)
shutil.rmtree(tmpdir)
return p.decode('utf-8')
with open('metadata.txt', 'r') as f:
non_metadata = bitarray(f.read())
non_metadata.invert()
with open('orphans.txt', 'r') as f:
orphans = bitarray(f.read())
with open('unique.txt', 'r') as f:
unique_32k = bitarray(f.read())
with open('unknown.txt', 'r') as f:
unknown = bitarray(f.read())
def iter_candidates():
candidates = non_metadata & orphans & unique_32k & unknown
for cluster_no in candidates.itersearch(bitarray('1')):
data = extract.read_cluster(cluster_no)
yield cluster_no, data
def process_candidate(candidate):
cluster_no, data = candidate
output = try_data(data, cluster_no)
return cluster_no, output
def main():
with ThreadPoolExecutor(max_workers=64) as pool:
for cluster_no, output in pool.map(process_candidate, iter_candidates()):
if 'Merkle tree is invalid' not in output:
print(cluster_no)
print(output)
break
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment