Created
October 20, 2023 02:22
-
-
Save uyjulian/b596c978da0c1031047e124eaf5d4f84 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-License-Identifier: MIT | |
# Falcom YamaNeko engine on PSP ISO format extraction. | |
# Uses the information contained in PSP_GAME/USRDIR/data.lst | |
# Also recursively unpacks cclm archive/group files | |
# See also: https://github.com/Trails-Research-Group | |
import struct | |
import io | |
import os | |
def read_unpack(fmt, f): | |
return struct.unpack(fmt, f.read(struct.calcsize(fmt))) | |
# based on https://github.com/barneygale/iso9660 | |
SECTOR_SIZE = 2048 | |
class ISO9660(object): | |
def __init__(self, path): | |
self._buff = None # input buffer | |
self._root = None # root node | |
self._pvd = {} # primary volume descriptor | |
self._paths = [] # path table | |
self._path = path | |
### Volume Descriptors | |
sector = 0x10 | |
while True: | |
self._get_sector(sector, SECTOR_SIZE) | |
sector += 1 | |
ty = self._unpack('B') | |
if ty == 1: | |
self._unpack_pvd() | |
elif ty == 255: | |
break | |
else: | |
continue | |
### Path table | |
l0 = self._pvd['path_table_size'] | |
self._get_sector(self._pvd['path_table_l_loc'], l0) | |
while l0 > 0: | |
p = {} | |
l1 = self._unpack('B') | |
l2 = self._unpack('B') | |
p['ex_loc'] = self._unpack('<I') | |
p['parent'] = self._unpack('<H') | |
p['name'] = self._unpack_string(l1).rstrip('\x00') | |
if l1 % 2 == 1: | |
self._unpack('B') | |
self._paths.append(p) | |
l0 -= 8 + l1 + (l1 % 2) | |
assert(l0 == 0) | |
## | |
## Retrieve file contents as a string | |
## | |
def get_file(self, path): | |
path = path.strip('/').split('/') | |
path, filename = path[:-1], path[-1] | |
parent_dir = self._root | |
if len(path) != 0: | |
parent_dir = self._dir_record_by_table(path) | |
if parent_dir == None: | |
parent_dir = self._dir_record_by_root(path) | |
if parent_dir == None: | |
raise Exception("Directory not found") | |
f = self._search_dir_children(parent_dir, filename) | |
if f == None: | |
raise Exception("File not found") | |
self._get_sector(f['ex_loc'], f['ex_len']) | |
return self._unpack_raw(f['ex_len']) | |
## | |
## Methods for retrieving partial contents | |
## | |
def _get_sector(self, sector, length): | |
with open(self._path, 'rb') as f: | |
f.seek(sector * SECTOR_SIZE) | |
self._buff = io.BytesIO(f.read(length)) | |
## | |
## Return the record for final directory in a path | |
## | |
def _dir_record_by_table(self, path): | |
for e in self._paths[::-1]: | |
search = list(path) | |
f = e | |
while f['name'] == search[-1]: | |
search.pop() | |
f = self._paths[f['parent']-1] | |
if f['parent'] == 1: | |
return e | |
return None | |
def _dir_record_by_root(self, path): | |
current = self._root | |
remaining = list(path) | |
while remaining: | |
current = self._search_dir_children(current, remaining[0]) | |
if current == None: | |
break | |
remaining.pop(0) | |
return current | |
## | |
## Unpack the Primary Volume Descriptor | |
## | |
def _unpack_pvd(self): | |
self._unpack_raw(131) | |
self._pvd['path_table_size'] = self._unpack_both('i') | |
self._pvd['path_table_l_loc'] = self._unpack('<I') | |
self._unpack_raw(12) | |
_, self._root = self._unpack_record() #root directory record | |
self._unpack_raw(692) | |
## | |
## Unpack a directory record (a listing of a file or folder) | |
## | |
def _unpack_record(self, read=0): | |
l0 = self._unpack('<B') | |
if l0 == 0: | |
return read + 1, None | |
l1 = self._unpack('<B') | |
d = dict() | |
d['ex_loc'] = self._unpack_both('I') | |
d['ex_len'] = self._unpack_both('I') | |
self._unpack_raw(14) | |
l2 = self._unpack('<B') | |
d['name'] = self._unpack_string(l2).split(';')[0].rstrip('\x00') | |
if l2 % 2 == 0: | |
self._unpack_raw(1) | |
t = 34 + l2 - (l2 % 2) | |
e = l0 - t | |
if e > 0: | |
self._unpack_raw(e) | |
return read + l0, d | |
# Assuming d is a directory record, this generator yields its children | |
def _unpack_dir_children(self, d): | |
sector = d['ex_loc'] | |
read = 0 | |
self._get_sector(sector, 2048) | |
read, r_self = self._unpack_record(read) | |
read, r_parent = self._unpack_record(read) | |
while read < r_self['ex_len']: # Iterate over files in the directory | |
if read % 2048 == 0: | |
sector += 1 | |
self._get_sector(sector, 2048) | |
read, data = self._unpack_record(read) | |
if data == None: # end of directory listing | |
to_read = 2048 - (read % 2048) | |
self._unpack_raw(to_read) | |
read += to_read | |
else: | |
yield data | |
# Search for one child amongst the children | |
def _search_dir_children(self, d, term): | |
for e in self._unpack_dir_children(d): | |
if e['name'] == term: | |
return e | |
return None | |
## | |
## Datatypes | |
## | |
def _unpack_raw(self, l): | |
return self._buff.read(l) | |
# both-endian | |
def _unpack_both(self, st): | |
a = self._unpack('<' + st) | |
b = self._unpack('>' + st) | |
assert(a == b) | |
return a | |
def _unpack_string(self, l): | |
return self._buff.read(l).rstrip(b' ').decode('ASCII') | |
def _unpack(self, st): | |
if st[0] not in ['<', '>']: | |
st = '<' + st | |
d = struct.unpack(st, self._buff.read(struct.calcsize(st))) | |
if len(st) == 2: | |
return d[0] | |
else: | |
return d | |
def iterate_list(cb, df, ext_list, size_own, curstr=b"", max_entry_count=None): | |
cur_entry_count = 0 | |
while df.tell() < size_own: | |
dname = df.read(8).rstrip(b"\x00") | |
dsize_or_count = int.from_bytes(df.read(4), byteorder="little") | |
dlba = int.from_bytes(df.read(2), byteorder="little") | (int.from_bytes(df.read(1), byteorder="little") << 16) | |
dext = int.from_bytes(df.read(1), byteorder="little") | |
if dext == 0: | |
cur_entry_count += iterate_list(cb, df, ext_list, size_own, curstr=curstr + dname + b"/", max_entry_count=dsize_or_count) | |
else: | |
cb(curstr + dname + b"." + ext_list[dext - 1], dsize_or_count, dlba) | |
cur_entry_count += 1 | |
if max_entry_count != None: | |
if cur_entry_count >= max_entry_count: | |
break | |
return cur_entry_count | |
# Reference: CEgPacks2::UnpackBZMode2 | |
# Also known as falcom_compress / BZ / BZip / zero method | |
def decompress(buffer, output, size): | |
offset = 0 # u16 | |
bits = 8 # 8 to start off with, then 16 | |
flags = int.from_bytes(buffer[offset:offset + 2], byteorder="little") | |
offset += 2 | |
flags >>= 8 | |
outputoffset = 0 # u16 | |
def getflag(): | |
nonlocal bits | |
nonlocal flags | |
nonlocal offset | |
if bits == 0: | |
slice_ = buffer[offset:offset + 2] | |
if len(slice_) < 2: | |
raise Exception("Out of data") | |
flags = int.from_bytes(slice_, byteorder="little") | |
offset += 2 | |
bits = 16 | |
flag = flags & 1 | |
flags >>= 1 | |
bits -= 1 | |
return flag | |
def setup_run(prev_u_buffer_pos): | |
nonlocal offset | |
nonlocal buffer | |
nonlocal output | |
nonlocal outputoffset | |
run = 2 # u16 | |
if getflag() == 0: | |
run += 1 | |
if getflag() == 0: | |
run += 1 | |
if getflag() == 0: | |
run += 1 | |
if getflag() == 0: | |
if getflag() == 0: | |
slice_ = buffer[offset:offset + 1] | |
if len(slice_) < 1: | |
raise Exception("Out of data") | |
run = int.from_bytes(slice_, byteorder="little") | |
offset += 1 | |
run += 0xE | |
else: | |
run = 0 | |
for i in range(3): | |
run = (run << 1) | getflag() | |
run += 0x6 | |
# Does the 'copy from buffer' thing | |
for i in range(run): | |
output[outputoffset] = output[outputoffset - prev_u_buffer_pos] | |
outputoffset += 1 | |
while True: | |
if getflag() != 0: # Call next method to process next flag | |
if getflag() != 0: # Long look-back distance or exit program or repeating sequence (flags = 11) | |
run = 0 # u16 | |
for i in range(5): # Load high-order distance from flags (max = 0x31) | |
run = (run << 1) | getflag() | |
prev_u_buffer_pos = int.from_bytes(buffer[offset:offset + 1], byteorder="little") # Load low-order distance (max = 0xFF) | |
# Also acts as flag byte | |
# run = 0 and byte = 0 -> exit program | |
# run = 0 and byte = 1 -> sequence of repeating bytes | |
offset += 1 | |
if run != 0: | |
prev_u_buffer_pos = prev_u_buffer_pos | (run << 8) # Add high and low order distance (max distance = 0x31FF) | |
setup_run(prev_u_buffer_pos) # Get run length and finish unpacking (write to output) | |
elif prev_u_buffer_pos > 2: # Is this used? Seems inefficient. | |
setup_run(prev_u_buffer_pos) | |
elif prev_u_buffer_pos == 0: # Decompression complete. End program. | |
break | |
else: # Repeating byte | |
branch = getflag() # True = long repeating sequence (> 30) | |
for i in range(4): | |
run = (run << 1) | getflag() | |
if branch != 0: | |
run = (run << 0x8) | int.from_bytes(buffer[offset:offset + 1], byteorder="little") # Load run length from byte and add high-order run length (max = 0xFFF + 0xE) | |
offset += 1 | |
run += 0xE | |
output[outputoffset:outputoffset + run] = bytes(buffer[offset:offset + 1]) * run | |
offset += 1 | |
outputoffset += run | |
else: # Short look-back distance (flags = 10) | |
prev_u_buffer_pos = int.from_bytes(buffer[offset:offset + 1], byteorder="little") # Get the look-back distance (max = 0xFF) | |
offset += 1 | |
setup_run(prev_u_buffer_pos) # Get run length and finish unpacking (write to output) | |
else: # Copy byte (flags = 0) | |
output[outputoffset:outputoffset + 1] = buffer[offset:offset + 1] | |
outputoffset += 1 | |
offset += 1 | |
return outputoffset, offset | |
# Reference: CSafeFile::freadP | |
# Also known as FALCOM3 compression | |
def decompress_blocks_stream(f): | |
flags = read_unpack("<I", f)[0] | |
dst = None | |
dst_offset = 0 | |
if (flags & 0x80000000) != 0: | |
raise Exception("High-bit method intentionally not supported") | |
else: | |
compressed_size = flags | |
uncompressed_size, num_blocks = read_unpack("<2I", f) | |
dst = bytearray(uncompressed_size) # Should already be initialized with 0 | |
cdata = io.BytesIO(f.read(compressed_size - 8)) | |
for i in range(num_blocks): | |
block_size = read_unpack("<H", cdata)[0] | |
output_tmp = bytearray(65536) | |
inbuf = cdata.read(block_size - 2) | |
if inbuf[0] != 0: | |
raise Exception("Non-zero method currently not supported") | |
num1, num2 = decompress(inbuf, output_tmp, block_size) | |
dst[dst_offset:dst_offset + num1] = output_tmp[0:num1] | |
dst_offset += num1 | |
if dst_offset >= uncompressed_size: | |
break | |
x = cdata.read(1) | |
if len(x) == 0: | |
break | |
if x[0] == 0: | |
break | |
return bytes(dst) | |
def unpack_cclm_recursive(df, curstr=b""): | |
files = [] | |
di1 = df.read(4) | |
di2 = df.read(4) | |
di3 = df.read(4) | |
di4 = df.read(4) | |
if len(di1) != 4 or len(di2) != 4 or len(di3) != 4 or len(di4) != 4: | |
return False | |
i1 = int.from_bytes(di1, byteorder="little") | |
i2 = int.from_bytes(di2, byteorder="little") | |
i3 = int.from_bytes(di3, byteorder="little") | |
i4 = int.from_bytes(di4, byteorder="little") | |
if i1 != i2 or i1 != i3 or i1 != i4: | |
return False | |
if i1 == 0: | |
return False | |
for i in range(i1): | |
name_b = df.read(16) | |
if len(name_b) != 16: | |
return False | |
offset_b = df.read(4) | |
if len(offset_b) != 4: | |
return False | |
size_b = df.read(4) | |
if len(size_b) != 4: | |
return False; | |
decompressed_size_b = df.read(4) | |
if len(decompressed_size_b) != 4: | |
return False | |
always_zero_b = df.read(4) | |
if len(always_zero_b) != 4: | |
return False | |
name = name_b.rstrip(b"\x00") | |
offset = int.from_bytes(offset_b, byteorder="little") | |
size = int.from_bytes(size_b, byteorder="little") | |
decompressed_size = int.from_bytes(decompressed_size_b, byteorder="little") | |
always_zero = int.from_bytes(always_zero_b, byteorder="little") | |
files.append([name, offset, size, decompressed_size]) | |
for x in files: | |
df.seek(x[1]) | |
d = df.read(x[2]) | |
if len(d) != x[2]: | |
return False | |
dbio = io.BytesIO(d) | |
if x[3] != 0 and x[2] != x[3]: | |
d = decompress_blocks_stream(dbio) | |
dbio = io.BytesIO(d) | |
name = x[0].decode("ASCII", errors="replace").replace("\uFFFD", "_") | |
if not unpack_cclm_recursive(dbio, curstr + name + "_unpacked/"): | |
fullpath = curstr + name | |
fullpath_dirname = os.path.dirname(fullpath) | |
os.makedirs(fullpath_dirname, exist_ok=True) | |
with open(fullpath, "wb") as wf: | |
wf.write(d) | |
return True | |
if __name__ == '__main__': | |
import sys | |
iso_path = sys.argv[1] | |
cd = ISO9660(iso_path) | |
out_path = sys.argv[2] | |
d = cd.get_file("PSP_GAME/USRDIR/data.lst") | |
df = io.BytesIO(d) | |
size_own = int.from_bytes(df.read(4), byteorder="little") | |
if len(d) != size_own: | |
raise Exception("Incorrect size of data.lst") | |
ext_list_d = df.read(0x400 - 4) | |
ext_list = [ext_list_d[i:i + 4].rstrip(b"\x00").replace(b"\x82", b"_").replace(b"\x86", b"_") for i in range(0, len(ext_list_d), 4)] | |
ext_list = [x for x in ext_list if x != b""] | |
with open(sys.argv[1], "rb") as f: | |
def list_cb(pathname, size, lba): | |
f.seek(SECTOR_SIZE * lba) | |
pathname_ascii = pathname.decode("ASCII") | |
fullpath = out_path + "/" + pathname_ascii | |
fullpath_dirname = os.path.dirname(fullpath) | |
os.makedirs(fullpath_dirname, exist_ok=True) | |
ds = f.read(size) | |
dsbio = io.BytesIO(ds) | |
if not unpack_cclm_recursive(dsbio, fullpath + "_unpacked/"): | |
with open(fullpath, "wb") as wf: | |
wf.write(ds) | |
iterate_list(list_cb, df, ext_list, size_own) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment