Skip to content

Instantly share code, notes, and snippets.

@uyjulian
Created October 20, 2023 02:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save uyjulian/b596c978da0c1031047e124eaf5d4f84 to your computer and use it in GitHub Desktop.
Save uyjulian/b596c978da0c1031047e124eaf5d4f84 to your computer and use it in GitHub Desktop.
# SPDX-License-Identifier: MIT
# Falcom YamaNeko engine on PSP ISO format extraction.
# Uses the information contained in PSP_GAME/USRDIR/data.lst
# Also recursively unpacks cclm archive/group files
# See also: https://github.com/Trails-Research-Group
import struct
import io
import os
def read_unpack(fmt, f):
return struct.unpack(fmt, f.read(struct.calcsize(fmt)))
# based on https://github.com/barneygale/iso9660
SECTOR_SIZE = 2048
class ISO9660(object):
def __init__(self, path):
self._buff = None # input buffer
self._root = None # root node
self._pvd = {} # primary volume descriptor
self._paths = [] # path table
self._path = path
### Volume Descriptors
sector = 0x10
while True:
self._get_sector(sector, SECTOR_SIZE)
sector += 1
ty = self._unpack('B')
if ty == 1:
self._unpack_pvd()
elif ty == 255:
break
else:
continue
### Path table
l0 = self._pvd['path_table_size']
self._get_sector(self._pvd['path_table_l_loc'], l0)
while l0 > 0:
p = {}
l1 = self._unpack('B')
l2 = self._unpack('B')
p['ex_loc'] = self._unpack('<I')
p['parent'] = self._unpack('<H')
p['name'] = self._unpack_string(l1).rstrip('\x00')
if l1 % 2 == 1:
self._unpack('B')
self._paths.append(p)
l0 -= 8 + l1 + (l1 % 2)
assert(l0 == 0)
##
## Retrieve file contents as a string
##
def get_file(self, path):
path = path.strip('/').split('/')
path, filename = path[:-1], path[-1]
parent_dir = self._root
if len(path) != 0:
parent_dir = self._dir_record_by_table(path)
if parent_dir == None:
parent_dir = self._dir_record_by_root(path)
if parent_dir == None:
raise Exception("Directory not found")
f = self._search_dir_children(parent_dir, filename)
if f == None:
raise Exception("File not found")
self._get_sector(f['ex_loc'], f['ex_len'])
return self._unpack_raw(f['ex_len'])
##
## Methods for retrieving partial contents
##
def _get_sector(self, sector, length):
with open(self._path, 'rb') as f:
f.seek(sector * SECTOR_SIZE)
self._buff = io.BytesIO(f.read(length))
##
## Return the record for final directory in a path
##
def _dir_record_by_table(self, path):
for e in self._paths[::-1]:
search = list(path)
f = e
while f['name'] == search[-1]:
search.pop()
f = self._paths[f['parent']-1]
if f['parent'] == 1:
return e
return None
def _dir_record_by_root(self, path):
current = self._root
remaining = list(path)
while remaining:
current = self._search_dir_children(current, remaining[0])
if current == None:
break
remaining.pop(0)
return current
##
## Unpack the Primary Volume Descriptor
##
def _unpack_pvd(self):
self._unpack_raw(131)
self._pvd['path_table_size'] = self._unpack_both('i')
self._pvd['path_table_l_loc'] = self._unpack('<I')
self._unpack_raw(12)
_, self._root = self._unpack_record() #root directory record
self._unpack_raw(692)
##
## Unpack a directory record (a listing of a file or folder)
##
def _unpack_record(self, read=0):
l0 = self._unpack('<B')
if l0 == 0:
return read + 1, None
l1 = self._unpack('<B')
d = dict()
d['ex_loc'] = self._unpack_both('I')
d['ex_len'] = self._unpack_both('I')
self._unpack_raw(14)
l2 = self._unpack('<B')
d['name'] = self._unpack_string(l2).split(';')[0].rstrip('\x00')
if l2 % 2 == 0:
self._unpack_raw(1)
t = 34 + l2 - (l2 % 2)
e = l0 - t
if e > 0:
self._unpack_raw(e)
return read + l0, d
# Assuming d is a directory record, this generator yields its children
def _unpack_dir_children(self, d):
sector = d['ex_loc']
read = 0
self._get_sector(sector, 2048)
read, r_self = self._unpack_record(read)
read, r_parent = self._unpack_record(read)
while read < r_self['ex_len']: # Iterate over files in the directory
if read % 2048 == 0:
sector += 1
self._get_sector(sector, 2048)
read, data = self._unpack_record(read)
if data == None: # end of directory listing
to_read = 2048 - (read % 2048)
self._unpack_raw(to_read)
read += to_read
else:
yield data
# Search for one child amongst the children
def _search_dir_children(self, d, term):
for e in self._unpack_dir_children(d):
if e['name'] == term:
return e
return None
##
## Datatypes
##
def _unpack_raw(self, l):
return self._buff.read(l)
# both-endian
def _unpack_both(self, st):
a = self._unpack('<' + st)
b = self._unpack('>' + st)
assert(a == b)
return a
def _unpack_string(self, l):
return self._buff.read(l).rstrip(b' ').decode('ASCII')
def _unpack(self, st):
if st[0] not in ['<', '>']:
st = '<' + st
d = struct.unpack(st, self._buff.read(struct.calcsize(st)))
if len(st) == 2:
return d[0]
else:
return d
def iterate_list(cb, df, ext_list, size_own, curstr=b"", max_entry_count=None):
cur_entry_count = 0
while df.tell() < size_own:
dname = df.read(8).rstrip(b"\x00")
dsize_or_count = int.from_bytes(df.read(4), byteorder="little")
dlba = int.from_bytes(df.read(2), byteorder="little") | (int.from_bytes(df.read(1), byteorder="little") << 16)
dext = int.from_bytes(df.read(1), byteorder="little")
if dext == 0:
cur_entry_count += iterate_list(cb, df, ext_list, size_own, curstr=curstr + dname + b"/", max_entry_count=dsize_or_count)
else:
cb(curstr + dname + b"." + ext_list[dext - 1], dsize_or_count, dlba)
cur_entry_count += 1
if max_entry_count != None:
if cur_entry_count >= max_entry_count:
break
return cur_entry_count
# Reference: CEgPacks2::UnpackBZMode2
# Also known as falcom_compress / BZ / BZip / zero method
def decompress(buffer, output, size):
offset = 0 # u16
bits = 8 # 8 to start off with, then 16
flags = int.from_bytes(buffer[offset:offset + 2], byteorder="little")
offset += 2
flags >>= 8
outputoffset = 0 # u16
def getflag():
nonlocal bits
nonlocal flags
nonlocal offset
if bits == 0:
slice_ = buffer[offset:offset + 2]
if len(slice_) < 2:
raise Exception("Out of data")
flags = int.from_bytes(slice_, byteorder="little")
offset += 2
bits = 16
flag = flags & 1
flags >>= 1
bits -= 1
return flag
def setup_run(prev_u_buffer_pos):
nonlocal offset
nonlocal buffer
nonlocal output
nonlocal outputoffset
run = 2 # u16
if getflag() == 0:
run += 1
if getflag() == 0:
run += 1
if getflag() == 0:
run += 1
if getflag() == 0:
if getflag() == 0:
slice_ = buffer[offset:offset + 1]
if len(slice_) < 1:
raise Exception("Out of data")
run = int.from_bytes(slice_, byteorder="little")
offset += 1
run += 0xE
else:
run = 0
for i in range(3):
run = (run << 1) | getflag()
run += 0x6
# Does the 'copy from buffer' thing
for i in range(run):
output[outputoffset] = output[outputoffset - prev_u_buffer_pos]
outputoffset += 1
while True:
if getflag() != 0: # Call next method to process next flag
if getflag() != 0: # Long look-back distance or exit program or repeating sequence (flags = 11)
run = 0 # u16
for i in range(5): # Load high-order distance from flags (max = 0x31)
run = (run << 1) | getflag()
prev_u_buffer_pos = int.from_bytes(buffer[offset:offset + 1], byteorder="little") # Load low-order distance (max = 0xFF)
# Also acts as flag byte
# run = 0 and byte = 0 -> exit program
# run = 0 and byte = 1 -> sequence of repeating bytes
offset += 1
if run != 0:
prev_u_buffer_pos = prev_u_buffer_pos | (run << 8) # Add high and low order distance (max distance = 0x31FF)
setup_run(prev_u_buffer_pos) # Get run length and finish unpacking (write to output)
elif prev_u_buffer_pos > 2: # Is this used? Seems inefficient.
setup_run(prev_u_buffer_pos)
elif prev_u_buffer_pos == 0: # Decompression complete. End program.
break
else: # Repeating byte
branch = getflag() # True = long repeating sequence (> 30)
for i in range(4):
run = (run << 1) | getflag()
if branch != 0:
run = (run << 0x8) | int.from_bytes(buffer[offset:offset + 1], byteorder="little") # Load run length from byte and add high-order run length (max = 0xFFF + 0xE)
offset += 1
run += 0xE
output[outputoffset:outputoffset + run] = bytes(buffer[offset:offset + 1]) * run
offset += 1
outputoffset += run
else: # Short look-back distance (flags = 10)
prev_u_buffer_pos = int.from_bytes(buffer[offset:offset + 1], byteorder="little") # Get the look-back distance (max = 0xFF)
offset += 1
setup_run(prev_u_buffer_pos) # Get run length and finish unpacking (write to output)
else: # Copy byte (flags = 0)
output[outputoffset:outputoffset + 1] = buffer[offset:offset + 1]
outputoffset += 1
offset += 1
return outputoffset, offset
# Reference: CSafeFile::freadP
# Also known as FALCOM3 compression
def decompress_blocks_stream(f):
flags = read_unpack("<I", f)[0]
dst = None
dst_offset = 0
if (flags & 0x80000000) != 0:
raise Exception("High-bit method intentionally not supported")
else:
compressed_size = flags
uncompressed_size, num_blocks = read_unpack("<2I", f)
dst = bytearray(uncompressed_size) # Should already be initialized with 0
cdata = io.BytesIO(f.read(compressed_size - 8))
for i in range(num_blocks):
block_size = read_unpack("<H", cdata)[0]
output_tmp = bytearray(65536)
inbuf = cdata.read(block_size - 2)
if inbuf[0] != 0:
raise Exception("Non-zero method currently not supported")
num1, num2 = decompress(inbuf, output_tmp, block_size)
dst[dst_offset:dst_offset + num1] = output_tmp[0:num1]
dst_offset += num1
if dst_offset >= uncompressed_size:
break
x = cdata.read(1)
if len(x) == 0:
break
if x[0] == 0:
break
return bytes(dst)
def unpack_cclm_recursive(df, curstr=b""):
files = []
di1 = df.read(4)
di2 = df.read(4)
di3 = df.read(4)
di4 = df.read(4)
if len(di1) != 4 or len(di2) != 4 or len(di3) != 4 or len(di4) != 4:
return False
i1 = int.from_bytes(di1, byteorder="little")
i2 = int.from_bytes(di2, byteorder="little")
i3 = int.from_bytes(di3, byteorder="little")
i4 = int.from_bytes(di4, byteorder="little")
if i1 != i2 or i1 != i3 or i1 != i4:
return False
if i1 == 0:
return False
for i in range(i1):
name_b = df.read(16)
if len(name_b) != 16:
return False
offset_b = df.read(4)
if len(offset_b) != 4:
return False
size_b = df.read(4)
if len(size_b) != 4:
return False;
decompressed_size_b = df.read(4)
if len(decompressed_size_b) != 4:
return False
always_zero_b = df.read(4)
if len(always_zero_b) != 4:
return False
name = name_b.rstrip(b"\x00")
offset = int.from_bytes(offset_b, byteorder="little")
size = int.from_bytes(size_b, byteorder="little")
decompressed_size = int.from_bytes(decompressed_size_b, byteorder="little")
always_zero = int.from_bytes(always_zero_b, byteorder="little")
files.append([name, offset, size, decompressed_size])
for x in files:
df.seek(x[1])
d = df.read(x[2])
if len(d) != x[2]:
return False
dbio = io.BytesIO(d)
if x[3] != 0 and x[2] != x[3]:
d = decompress_blocks_stream(dbio)
dbio = io.BytesIO(d)
name = x[0].decode("ASCII", errors="replace").replace("\uFFFD", "_")
if not unpack_cclm_recursive(dbio, curstr + name + "_unpacked/"):
fullpath = curstr + name
fullpath_dirname = os.path.dirname(fullpath)
os.makedirs(fullpath_dirname, exist_ok=True)
with open(fullpath, "wb") as wf:
wf.write(d)
return True
if __name__ == '__main__':
import sys
iso_path = sys.argv[1]
cd = ISO9660(iso_path)
out_path = sys.argv[2]
d = cd.get_file("PSP_GAME/USRDIR/data.lst")
df = io.BytesIO(d)
size_own = int.from_bytes(df.read(4), byteorder="little")
if len(d) != size_own:
raise Exception("Incorrect size of data.lst")
ext_list_d = df.read(0x400 - 4)
ext_list = [ext_list_d[i:i + 4].rstrip(b"\x00").replace(b"\x82", b"_").replace(b"\x86", b"_") for i in range(0, len(ext_list_d), 4)]
ext_list = [x for x in ext_list if x != b""]
with open(sys.argv[1], "rb") as f:
def list_cb(pathname, size, lba):
f.seek(SECTOR_SIZE * lba)
pathname_ascii = pathname.decode("ASCII")
fullpath = out_path + "/" + pathname_ascii
fullpath_dirname = os.path.dirname(fullpath)
os.makedirs(fullpath_dirname, exist_ok=True)
ds = f.read(size)
dsbio = io.BytesIO(ds)
if not unpack_cclm_recursive(dsbio, fullpath + "_unpacked/"):
with open(fullpath, "wb") as wf:
wf.write(ds)
iterate_list(list_cb, df, ext_list, size_own)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment