Before running the python program, install the following dependencies via pip:
clickzstandardjsbeautifier
| #!/usr/bin/env python3 | |
| import struct | |
| from dataclasses import dataclass, field | |
| from typing import List, Tuple | |
| import os | |
| import json | |
| import click | |
| import jsbeautifier | |
| import sys | |
| import zstandard as zstd | |
| # set this to True to get EXTREMELY verbose debugging on what is being read | |
| DEBUG_MESSAGES = False | |
| def print_debug(*args, **kwargs): | |
| if DEBUG_MESSAGES: | |
| print(*args, **kwargs) | |
| @dataclass | |
| class FileEntry: | |
| name: str # name of the entry | |
| content: bytes # content of the entry | |
| is_metadata: bool # unused | |
| @dataclass | |
| class Archive: | |
| magic_number: bytes | |
| entries: List[FileEntry] = field(default_factory=list) | |
| class ArchiveParserError(Exception): | |
| pass | |
| class ArchiveParser: | |
| def __init__(self, data: bytes, format_js: bool = False): | |
| self.data = data | |
| self.cursor = 0 | |
| self.length = len(data) | |
| self.archive = Archive(magic_number=b'') | |
| self.format_js = format_js | |
| def read_bytes(self, size: int) -> bytes: | |
| if self.cursor + size > self.length: | |
| raise ArchiveParserError(f"Unexpected end of data at position {self.cursor}. Expected {size} more bytes.") | |
| result = self.data[self.cursor:self.cursor+size] | |
| self.cursor += size | |
| return result | |
| def read_uint32_le(self) -> int: | |
| self.align_to_4_bytes() | |
| raw_bytes = self.read_bytes(4) | |
| value = struct.unpack('<I', raw_bytes)[0] | |
| print_debug(f"Read uint32_le at {self.cursor - 4}: {value} (hex: {raw_bytes.hex()})") | |
| return value | |
| def read_length_and_flag(self) -> Tuple[int, bool]: | |
| self.align_to_4_bytes() | |
| raw_bytes = self.read_bytes(4) | |
| raw_value = struct.unpack('<I', raw_bytes)[0] | |
| flag = (raw_value & 0x80000000) != 0 # Extract the highest bit (MSB) | |
| length = raw_value & 0x7FFFFFFF # Mask out the highest bit to get the length | |
| print_debug(f"Read length: {length}, flag: {flag}, at position {self.cursor - 4} (hex: {raw_bytes.hex()})") | |
| return length, flag | |
| def align_to_4_bytes(self): | |
| # Aligns the cursor to the next multiple of 4 bytes | |
| alignment = self.cursor % 4 | |
| if alignment != 0: | |
| padding = 4 - alignment | |
| print_debug(f"Aligning cursor from {self.cursor} to {self.cursor + padding} (skipping {padding} bytes)") | |
| self.cursor += padding | |
| def parse(self): | |
| try: | |
| self.parse_magic_number() | |
| while self.cursor < self.length: | |
| self.align_to_4_bytes() | |
| if self.cursor >= self.length: | |
| break | |
| # Peek ahead to see if the next 4 bytes could be a valid length field | |
| if self.cursor + 4 > self.length: | |
| print(f"Insufficient bytes for next length field at position {self.cursor}") | |
| break | |
| next_length_value_bytes = self.data[self.cursor:self.cursor+4] | |
| next_length_value = struct.unpack('<I', next_length_value_bytes)[0] | |
| next_length, flag = self.parse_length_preview(next_length_value) | |
| if self.is_valid_length(next_length): | |
| self.parse_entry() | |
| else: | |
| print(f"Invalid length ({next_length}) at position {self.cursor}, stopping parsing.") | |
| break # Exit loop if invalid length is encountered | |
| print_debug("Parsing complete.") | |
| except ArchiveParserError as e: | |
| print(f"Error parsing archive: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| def parse_magic_number(self): | |
| self.align_to_4_bytes() | |
| self.archive.magic_number = self.read_bytes(4) | |
| magic_number_value = struct.unpack('<I', self.archive.magic_number)[0] | |
| print_debug(f"Magic number at {self.cursor - 4}: {self.archive.magic_number.hex()}") | |
| if magic_number_value == 0xFD2FB528 or magic_number_value == 0xFD2FB527: # Zstandard magic number | |
| print_debug("Detected Zstandard compressed file.") | |
| decompressor = zstd.ZstdDecompressor() | |
| try: | |
| with decompressor.stream_reader(self.data) as reader: | |
| decompressed_data = reader.read() | |
| except zstd.ZstdError as e: | |
| raise ArchiveParserError(f"Error during decompression: {e}") | |
| self.data = decompressed_data | |
| self.cursor = 0 | |
| self.length = len(self.data) | |
| print_debug("Decompression successful. Re-parsing magic number.") | |
| self.parse_magic_number() | |
| elif magic_number_value == 0x0100C633: # Valdi magic number (little-endian) | |
| print_debug("Valid Valdi module detected.") | |
| unknown_value = self.read_bytes(4) | |
| print_debug(f"Unknown 4-byte value after magic number: {unknown_value.hex()} (position {self.cursor - 4})") | |
| else: | |
| raise ArchiveParserError(f"Error: File is not a valid Valdi module. Magic number: 0x{magic_number_value:08X}") | |
| def parse_length_preview(self, value: int) -> Tuple[int, bool]: | |
| flag = (value & 0x80000000) != 0 # Extract the highest bit | |
| length = value & 0x7FFFFFFF # Mask out the highest bit to get the length | |
| return length, flag | |
| def is_valid_length(self, value: int) -> bool: | |
| # TODO: get rid of this hardcoded value maybe? | |
| return 0 < value < 100000000 | |
| def parse_entry(self): | |
| # name length | |
| print_debug(f"Starting new entry at position {self.cursor}") | |
| name_length, name_flag = self.read_length_and_flag() | |
| print_debug(f"Name length: {name_length}, flag: {name_flag}") | |
| # name | |
| self.align_to_4_bytes() | |
| name_bytes = self.read_bytes(name_length) | |
| print_debug(f"Name bytes at {self.cursor - name_length}: {name_bytes.hex()}") | |
| try: | |
| name = name_bytes.decode('utf-8') | |
| print_debug(f"Name: {name}") | |
| except UnicodeDecodeError: | |
| raise ArchiveParserError(f"Invalid UTF-8 sequence in name at position {self.cursor - name_length}") | |
| # content length | |
| content_length, content_flag = self.read_length_and_flag() | |
| print_debug(f"Content length: {content_length}, flag: {content_flag}") | |
| # content | |
| self.align_to_4_bytes() | |
| content = self.read_bytes(content_length) | |
| print_debug(f"Content bytes at {self.cursor - content_length} (first 64 bytes or full length): {content[:64].hex()}... [total {len(content)} bytes]") | |
| # TODO: maybe figure out what is metadata vs. what isn't but might not even be | |
| # necessary idk if meta is a thing but there are some "meta-esque" values | |
| # like hash and download_manifest | |
| is_metadata = False | |
| self.archive.entries.append(FileEntry(name=name, content=content, is_metadata=is_metadata)) | |
| print_debug(f"Extracted entry: {name}, size: {content_length} bytes") | |
| def extract_files(self, output_dir: str): | |
| os.makedirs(output_dir, exist_ok=True) | |
| for entry in self.archive.entries: | |
| if not entry.is_metadata: | |
| file_path = os.path.join(output_dir, entry.name) | |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
| content_to_write = entry.content | |
| if self.format_js and entry.name.endswith('.js'): | |
| print_debug(f"Formatting JavaScript file: {entry.name}") | |
| try: | |
| opts = jsbeautifier.default_options() | |
| content_str = entry.content.decode('utf-8', errors='replace') | |
| formatted_content = jsbeautifier.beautify(content_str, opts) | |
| content_to_write = formatted_content.encode('utf-8') | |
| except Exception as e: | |
| print(f"Error formatting JavaScript file {entry.name}: {e}") | |
| with open(file_path, 'wb') as f: | |
| f.write(content_to_write) | |
| print_debug(f"Wrote file: {file_path}") | |
| print(f"Extracted {len(self.archive.entries)} entries to {output_dir}") | |
| def parse_archive(archive_path: str, format_js: bool, output_dir: str = None): | |
| if not os.path.isfile(archive_path): | |
| print(f"Error: File not found - {archive_path}", file=sys.stderr) | |
| sys.exit(1) | |
| with open(archive_path, 'rb') as f: | |
| data = f.read() | |
| parser = ArchiveParser(data, format_js=format_js) | |
| parser.parse() | |
| if output_dir is None: | |
| output_dir = os.path.splitext(os.path.basename(archive_path))[0] | |
| else: | |
| output_dir = os.path.join(output_dir, os.path.splitext(os.path.basename(archive_path))[0]) | |
| parser.extract_files(output_dir) | |
| @click.command(context_settings=dict(help_option_names=['-h', '--help'])) | |
| @click.argument('archive_paths', nargs=-1, type=click.Path(exists=True)) | |
| @click.option('-f', '--format-js', is_flag=True, help='Format JavaScript files using jsbeautifier before saving.') | |
| @click.option('-o', '--output-dir', type=str, help='Directory where extracted files should be placed.') | |
| def main(archive_paths, format_js, output_dir): | |
| """ | |
| Extract entries from one or more Valdi module archives. | |
| \b | |
| ARCHIVE_PATHS: One or more paths to Valdi module files. | |
| """ | |
| if not archive_paths: | |
| print("Error: No archive paths provided.", file=sys.stderr) | |
| sys.exit(1) | |
| for archive_path in archive_paths: | |
| print(f"Processing archive: {archive_path}") | |
| try: | |
| parse_archive(archive_path, format_js, output_dir) | |
| except ArchiveParserError as e: | |
| print(f"Failed to parse archive {archive_path}: {e}", file=sys.stderr) | |
| except Exception as e: | |
| print(f"An error occurred while processing {archive_path}: {e}", file=sys.stderr) | |
| if __name__ == '__main__': | |
| main() |
Before running the python program, install the following dependencies via pip:
clickzstandardjsbeautifier