Skip to content

Instantly share code, notes, and snippets.

@PistonMiner
Created March 25, 2022 01:23
Show Gist options
  • Save PistonMiner/ffa7f83b59d4d0b1533abc74247406c7 to your computer and use it in GitHub Desktop.
Save PistonMiner/ffa7f83b59d4d0b1533abc74247406c7 to your computer and use it in GitHub Desktop.
Recovering deleted files from Valve VPKs
# SPDX-License-Identifier: MIT
# Copyright 2020 Linus S. (aka PistonMiner)
import sys
import os
import struct
import bisect
import datetime
import hashlib
class RangeSet:
def __init__(self):
self.ranges = []
@classmethod
def full_range(size):
result = RangeSet()
result.add(0, size)
return result
def invert(self, start, end):
new = RangeSet()
# Empty range
if len(self.ranges) == 0:
new.add(start, end)
return new
# Start
first_start, _ = self.ranges[0]
assert(start <= first_start)
if first_start > start:
new.add(start, first_start)
# Gaps
for i in range(len(self.ranges) - 1):
_, lhs_end = self.ranges[i]
rhs_start, _ = self.ranges[i + 1]
new.add(lhs_end, rhs_start)
# End
_, last_end = self.ranges[-1]
assert(end >= last_end)
if end > last_end:
new.add(last_end, end)
return new
def add(self, start, end=None):
if end == None:
end = start + 1
# Empty range, ignore
if end <= start:
return
# Find spot to insert
new_index = bisect.bisect_right(self.ranges, (start, end))
insert_start_index = new_index
insert_end_index = new_index
# Not inserting at start, check for overlap with previous
if new_index > 0:
prev_index = new_index - 1
prev_start, prev_end = self.ranges[prev_index]
# Do we overlap or touch the preceding range?
if prev_end >= start:
# Does it contain us?
if prev_end >= end:
# We're already present.
return
# Remove existing entry in place of us
insert_start_index = prev_index
start = prev_start
# Overlap with following ranges
while insert_end_index < len(self.ranges):
next_start, next_end = self.ranges[insert_end_index]
if next_end > end:
# Partial overlap?
if next_start <= end:
insert_end_index += 1
end = next_end
break
# Fully contained
insert_end_index += 1
self.ranges[insert_start_index:insert_end_index] = [(start, end)]
assert(self.ranges == sorted(self.ranges))
def mass(self):
return sum([end - start for start, end in self.ranges])
def __repr__(self):
return repr(self.ranges)
def __len__(self):
return len(self.ranges)
def __getitem__(self, key):
return self.ranges[key]
def __delitem__(self, key):
del self.ranges[key]
def copy(self):
new = RangeSet()
new.ranges = self.ranges[:]
return new
def align_up(val, align):
return (val + align - 1) & ~(align - 1)
VPK_CONTENT_ALIGN = 16
def vpk_content_split(data):
# Try to identify splits in the data based on common resource format.
# Source 2 resources start with the file size as the first field, we can
# use this.
chunks = []
p = 0
while p < len(data):
possibly_file_size = struct.unpack_from("<l", data, p)[0]
# Sanity check this field. It could be borked.
if possibly_file_size < 4 or possibly_file_size > len(data[p:]):
return None
# Check that padding is zero.
chunk_size = align_up(possibly_file_size, VPK_CONTENT_ALIGN)
if not all([b == 0 for b in data[p + possibly_file_size:p + chunk_size]]):
return None
# TODO: We include padding with the chunk data right now. Not sure if
# that's a good idea.
chunks.append(data[p:p + chunk_size])
p += chunk_size
return chunks
def vpk_undelete(dir_vpk_path, dump_directory_path, log):
# Load directory VPK
vpk_dir, dir_vpk_name = os.path.split(dir_vpk_path)
dir_vpk_base, dir_vpk_ext = os.path.splitext(dir_vpk_name)
dir_vpk_suffix = "_dir"
if not dir_vpk_base.endswith(dir_vpk_suffix):
log("Unexpected VPK file name format")
return
vpk_prefix = dir_vpk_base[:-len(dir_vpk_suffix)]
data_vpk_name_format = vpk_prefix + "_{:03}" + dir_vpk_ext
# TODO: This seems slightly dangerous considering what we're passing in
# isn't really a path
data_vpk_path_format = os.path.join(vpk_dir, data_vpk_name_format)
with open(dir_vpk_path, "rb") as f:
dir_vpk_data = f.read()
# AUTO: Print hash
m = hashlib.sha256()
m.update(dir_vpk_data)
log("Info: path={}, size={}, sha256={}".format(dir_vpk_path, len(dir_vpk_data), m.hexdigest()))
# Parse directory VPK
# Header
if len(dir_vpk_data) < 0x1c:
log("File too small to be a VPK")
return
signature = struct.unpack_from("<L", dir_vpk_data, 0x0)[0]
if signature != 0x55aa1234:
log("Unexpected VPK signature (not a VPK?)")
return
version = struct.unpack_from("<L", dir_vpk_data, 0x4)[0]
if version != 2:
log("Unsupported VPK version {}".format(version))
return
def read_string(data, offset, max_len=None):
result = bytearray()
i = 0
while max_len == None or i < max_len:
char = data[offset + i]
if char == 0:
break
result.append(char)
i += 1
return result.decode("utf-8")
p = 0x1c
# Types
file_types = []
while dir_vpk_data[p]:
extension = read_string(dir_vpk_data, p)
p += len(extension) + 1
# Directories
directories = []
while dir_vpk_data[p]:
directory_path = read_string(dir_vpk_data, p)
p += len(directory_path) + 1
# Files
files = []
while dir_vpk_data[p]:
file_name = read_string(dir_vpk_data, p)
p += len(file_name) + 1
crc = struct.unpack_from("<L", dir_vpk_data, p + 0x00)[0]
preload_bytes = struct.unpack_from("<H", dir_vpk_data, p + 0x04)[0]
archive_index = struct.unpack_from("<H", dir_vpk_data, p + 0x06)[0]
entry_offset = struct.unpack_from("<L", dir_vpk_data, p + 0x08)[0]
entry_length = struct.unpack_from("<L", dir_vpk_data, p + 0x0c)[0]
terminator = struct.unpack_from("<H", dir_vpk_data, p + 0x10)[0]
p += 0x12
#assert(preload_bytes == 0)
p += preload_bytes
if archive_index == 0x7fff:
log("{} stored in dir, skipping".format(file_name))
continue
assert(entry_length != 0)
assert(terminator == 0xffff)
file = {
"name": file_name,
"crc": crc,
"preload_bytes": preload_bytes,
"archive_index": archive_index,
"entry_offset": entry_offset,
"entry_length": entry_length
}
files.append(file)
# Skip terminator
p += 1
directory = {
"directory_path": directory_path,
"files": files
}
directories.append(directory)
# Skip terminator
p += 1
file_type = {
"extension": extension,
"directories": directories
}
file_types.append(file_type)
# Skip terminator
p += 1
# Find highest index content VPK
present_content_vpks = RangeSet()
for t in file_types:
for d in t["directories"]:
for f in d["files"]:
present_content_vpks.add(f["archive_index"])
content_vpk_count = present_content_vpks.ranges[-1][1]
log("Found {} referenced VPKs".format(content_vpk_count))
# Find accounted-for ranges
accounted_range_sets = [RangeSet() for i in range(content_vpk_count)]
for t in file_types:
for d in t["directories"]:
for f in d["files"]:
archive_index = f["archive_index"]
start = f["entry_offset"]
end = start + f["entry_length"]
accounted_range_sets[archive_index].add(start, end)
# Find file sizes
content_vpk_sizes = []
for i in range(content_vpk_count):
content_vpk_path = data_vpk_path_format.format(i)
try:
content_vpk_size = os.path.getsize(content_vpk_path)
except FileNotFoundError:
# Use last accounted for as estimated size
content_vpk_size = accounted_range_sets[-1][1]
content_vpk_sizes.append(content_vpk_size)
# Find areas not accounted for
unaccounted_range_sets = [
r.invert(0, content_vpk_sizes[i]) for i, r in enumerate(accounted_range_sets)
]
# Prune chunks that can be attributed to alignment
for i in range(content_vpk_count):
unaccounted = unaccounted_range_sets[i]
ri = 0
while ri < len(unaccounted):
start, end = unaccounted[ri]
# Discard likely alignment chunks (end on even and are smaller than the alignment)
if end & (VPK_CONTENT_ALIGN - 1) == 0 and end - start < VPK_CONTENT_ALIGN:
del unaccounted[ri]
continue
ri += 1
# Dump results
def dump_vpk_file_region(data, path_format, start):
# Try to split.
chunks = vpk_content_split(data)
if not chunks:
# Failed to split. Dump in one piece.
# HACK: Gotta compute path early.
dump_path = path_format.format(start, len(data))
log("Splitting heuristic failed, dumping as one chunk ({})".format(dump_path))
chunks = [data]
p = start
for chunk in chunks:
dump_path = path_format.format(p, len(chunk))
with open(dump_path, "wb") as dump_file:
dump_file.write(chunk)
p += len(chunk)
def dump_vpk_file_ranges(content_file, path_format, ranges):
for start, end in ranges:
# Align start upwards to exclude padding from the previous file
start = align_up(start, VPK_CONTENT_ALIGN)
content_file.seek(start)
dumped_data = content_file.read(end - start)
dump_vpk_file_region(dumped_data, path_format, start)
for i in range(content_vpk_count):
mass = unaccounted_range_sets[i].mass()
if mass > 0:
log("{:03}: {:.2f} KB unaccounted for across {} regions".format(
i, mass / 1024.0, len(unaccounted_range_sets[i])
))
# Dump data if requested
if dump_directory_path:
dump_path_format = os.path.join(
dump_directory_path,
"dump_{:03}".format(i) + "_{:x}_{:x}.bin"
)
content_vpk_path = data_vpk_path_format.format(i)
try:
with open(content_vpk_path, "rb") as f:
dump_vpk_file_ranges(f, dump_path_format, unaccounted_range_sets[i])
except FileNotFoundError:
log("{} doesn't exist, can't dump.".format(content_vpk_path))
log("Complete.")
def auto_mode():
dir_vpk_names = []
filenames = os.listdir(".")
for filename in filenames:
if filename.endswith("_dir.vpk"):
dir_vpk_names.append(filename)
base_directory_path = "./vpk_undelete_{}/".format(datetime.datetime.utcnow().timestamp())
try:
os.mkdir(base_directory_path)
except: # FileExistsError
pass
for dir_vpk_name in dir_vpk_names:
print("Checking {}...".format(dir_vpk_name))
dump_directory_path = os.path.join(base_directory_path, dir_vpk_name)
try:
os.mkdir(dump_directory_path)
except: # FileExistsError
pass
log_text = ""
def log(s):
print(s)
nonlocal log_text
log_text += "{}\n".format(s)
vpk_undelete(dir_vpk_name, dump_directory_path, log)
log_path = os.path.join(dump_directory_path, "log.txt")
with open(log_path, "w", encoding="utf-8") as f:
f.write(log_text)
def user_mode():
vpk_undelete(
sys.argv[1],
sys.argv[2] if len(sys.argv) >= 3 else None,
print
)
def main():
if len(sys.argv) not in [1, 2, 3]:
print("Usage: {} [VPK path] [dump directory path]".format(sys.argv[0]))
return
if len(sys.argv) == 1:
auto_mode()
else:
user_mode()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment