Skip to content

Instantly share code, notes, and snippets.

@BlueFalconHD
Created March 17, 2025 02:13
Show Gist options
  • Select an option

  • Save BlueFalconHD/0b6b582595fe04d594a06ac4dc93a930 to your computer and use it in GitHub Desktop.

Select an option

Save BlueFalconHD/0b6b582595fe04d594a06ac4dc93a930 to your computer and use it in GitHub Desktop.
Snapchat Valdi module extractor
#!/usr/bin/env python3
import struct
from dataclasses import dataclass, field
from typing import List, Tuple
import os
import json
import click
import jsbeautifier
import sys
import zstandard as zstd
# set this to True to get EXTREMELY verbose debugging on what is being read
DEBUG_MESSAGES = False
def print_debug(*args, **kwargs):
if DEBUG_MESSAGES:
print(*args, **kwargs)
@dataclass
class FileEntry:
name: str # name of the entry
content: bytes # content of the entry
is_metadata: bool # unused
@dataclass
class Archive:
magic_number: bytes
entries: List[FileEntry] = field(default_factory=list)
class ArchiveParserError(Exception):
pass
class ArchiveParser:
def __init__(self, data: bytes, format_js: bool = False):
self.data = data
self.cursor = 0
self.length = len(data)
self.archive = Archive(magic_number=b'')
self.format_js = format_js
def read_bytes(self, size: int) -> bytes:
if self.cursor + size > self.length:
raise ArchiveParserError(f"Unexpected end of data at position {self.cursor}. Expected {size} more bytes.")
result = self.data[self.cursor:self.cursor+size]
self.cursor += size
return result
def read_uint32_le(self) -> int:
self.align_to_4_bytes()
raw_bytes = self.read_bytes(4)
value = struct.unpack('<I', raw_bytes)[0]
print_debug(f"Read uint32_le at {self.cursor - 4}: {value} (hex: {raw_bytes.hex()})")
return value
def read_length_and_flag(self) -> Tuple[int, bool]:
self.align_to_4_bytes()
raw_bytes = self.read_bytes(4)
raw_value = struct.unpack('<I', raw_bytes)[0]
flag = (raw_value & 0x80000000) != 0 # Extract the highest bit (MSB)
length = raw_value & 0x7FFFFFFF # Mask out the highest bit to get the length
print_debug(f"Read length: {length}, flag: {flag}, at position {self.cursor - 4} (hex: {raw_bytes.hex()})")
return length, flag
def align_to_4_bytes(self):
# Aligns the cursor to the next multiple of 4 bytes
alignment = self.cursor % 4
if alignment != 0:
padding = 4 - alignment
print_debug(f"Aligning cursor from {self.cursor} to {self.cursor + padding} (skipping {padding} bytes)")
self.cursor += padding
def parse(self):
try:
self.parse_magic_number()
while self.cursor < self.length:
self.align_to_4_bytes()
if self.cursor >= self.length:
break
# Peek ahead to see if the next 4 bytes could be a valid length field
if self.cursor + 4 > self.length:
print(f"Insufficient bytes for next length field at position {self.cursor}")
break
next_length_value_bytes = self.data[self.cursor:self.cursor+4]
next_length_value = struct.unpack('<I', next_length_value_bytes)[0]
next_length, flag = self.parse_length_preview(next_length_value)
if self.is_valid_length(next_length):
self.parse_entry()
else:
print(f"Invalid length ({next_length}) at position {self.cursor}, stopping parsing.")
break # Exit loop if invalid length is encountered
print_debug("Parsing complete.")
except ArchiveParserError as e:
print(f"Error parsing archive: {e}", file=sys.stderr)
sys.exit(1)
def parse_magic_number(self):
self.align_to_4_bytes()
self.archive.magic_number = self.read_bytes(4)
magic_number_value = struct.unpack('<I', self.archive.magic_number)[0]
print_debug(f"Magic number at {self.cursor - 4}: {self.archive.magic_number.hex()}")
if magic_number_value == 0xFD2FB528 or magic_number_value == 0xFD2FB527: # Zstandard magic number
print_debug("Detected Zstandard compressed file.")
decompressor = zstd.ZstdDecompressor()
try:
with decompressor.stream_reader(self.data) as reader:
decompressed_data = reader.read()
except zstd.ZstdError as e:
raise ArchiveParserError(f"Error during decompression: {e}")
self.data = decompressed_data
self.cursor = 0
self.length = len(self.data)
print_debug("Decompression successful. Re-parsing magic number.")
self.parse_magic_number()
elif magic_number_value == 0x0100C633: # Valdi magic number (little-endian)
print_debug("Valid Valdi module detected.")
unknown_value = self.read_bytes(4)
print_debug(f"Unknown 4-byte value after magic number: {unknown_value.hex()} (position {self.cursor - 4})")
else:
raise ArchiveParserError(f"Error: File is not a valid Valdi module. Magic number: 0x{magic_number_value:08X}")
def parse_length_preview(self, value: int) -> Tuple[int, bool]:
flag = (value & 0x80000000) != 0 # Extract the highest bit
length = value & 0x7FFFFFFF # Mask out the highest bit to get the length
return length, flag
def is_valid_length(self, value: int) -> bool:
# TODO: get rid of this hardcoded value maybe?
return 0 < value < 100000000
def parse_entry(self):
# name length
print_debug(f"Starting new entry at position {self.cursor}")
name_length, name_flag = self.read_length_and_flag()
print_debug(f"Name length: {name_length}, flag: {name_flag}")
# name
self.align_to_4_bytes()
name_bytes = self.read_bytes(name_length)
print_debug(f"Name bytes at {self.cursor - name_length}: {name_bytes.hex()}")
try:
name = name_bytes.decode('utf-8')
print_debug(f"Name: {name}")
except UnicodeDecodeError:
raise ArchiveParserError(f"Invalid UTF-8 sequence in name at position {self.cursor - name_length}")
# content length
content_length, content_flag = self.read_length_and_flag()
print_debug(f"Content length: {content_length}, flag: {content_flag}")
# content
self.align_to_4_bytes()
content = self.read_bytes(content_length)
print_debug(f"Content bytes at {self.cursor - content_length} (first 64 bytes or full length): {content[:64].hex()}... [total {len(content)} bytes]")
# TODO: maybe figure out what is metadata vs. what isn't but might not even be
# necessary idk if meta is a thing but there are some "meta-esque" values
# like hash and download_manifest
is_metadata = False
self.archive.entries.append(FileEntry(name=name, content=content, is_metadata=is_metadata))
print_debug(f"Extracted entry: {name}, size: {content_length} bytes")
def extract_files(self, output_dir: str):
os.makedirs(output_dir, exist_ok=True)
for entry in self.archive.entries:
if not entry.is_metadata:
file_path = os.path.join(output_dir, entry.name)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
content_to_write = entry.content
if self.format_js and entry.name.endswith('.js'):
print_debug(f"Formatting JavaScript file: {entry.name}")
try:
opts = jsbeautifier.default_options()
content_str = entry.content.decode('utf-8', errors='replace')
formatted_content = jsbeautifier.beautify(content_str, opts)
content_to_write = formatted_content.encode('utf-8')
except Exception as e:
print(f"Error formatting JavaScript file {entry.name}: {e}")
with open(file_path, 'wb') as f:
f.write(content_to_write)
print_debug(f"Wrote file: {file_path}")
print(f"Extracted {len(self.archive.entries)} entries to {output_dir}")
def parse_archive(archive_path: str, format_js: bool, output_dir: str = None):
if not os.path.isfile(archive_path):
print(f"Error: File not found - {archive_path}", file=sys.stderr)
sys.exit(1)
with open(archive_path, 'rb') as f:
data = f.read()
parser = ArchiveParser(data, format_js=format_js)
parser.parse()
if output_dir is None:
output_dir = os.path.splitext(os.path.basename(archive_path))[0]
else:
output_dir = os.path.join(output_dir, os.path.splitext(os.path.basename(archive_path))[0])
parser.extract_files(output_dir)
@click.command(context_settings=dict(help_option_names=['-h', '--help']))
@click.argument('archive_paths', nargs=-1, type=click.Path(exists=True))
@click.option('-f', '--format-js', is_flag=True, help='Format JavaScript files using jsbeautifier before saving.')
@click.option('-o', '--output-dir', type=str, help='Directory where extracted files should be placed.')
def main(archive_paths, format_js, output_dir):
"""
Extract entries from one or more Valdi module archives.
\b
ARCHIVE_PATHS: One or more paths to Valdi module files.
"""
if not archive_paths:
print("Error: No archive paths provided.", file=sys.stderr)
sys.exit(1)
for archive_path in archive_paths:
print(f"Processing archive: {archive_path}")
try:
parse_archive(archive_path, format_js, output_dir)
except ArchiveParserError as e:
print(f"Failed to parse archive {archive_path}: {e}", file=sys.stderr)
except Exception as e:
print(f"An error occurred while processing {archive_path}: {e}", file=sys.stderr)
if __name__ == '__main__':
main()

Before running the python program, install the following dependencies via pip:

  • click
  • zstandard
  • jsbeautifier
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment