-
-
Save PistonMiner/ffa7f83b59d4d0b1533abc74247406c7 to your computer and use it in GitHub Desktop.
Recovering deleted files from Valve VPKs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-License-Identifier: MIT | |
# Copyright 2020 Linus S. (aka PistonMiner) | |
import sys | |
import os | |
import struct | |
import bisect | |
import datetime | |
import hashlib | |
class RangeSet: | |
def __init__(self): | |
self.ranges = [] | |
@classmethod | |
def full_range(size): | |
result = RangeSet() | |
result.add(0, size) | |
return result | |
def invert(self, start, end): | |
new = RangeSet() | |
# Empty range | |
if len(self.ranges) == 0: | |
new.add(start, end) | |
return new | |
# Start | |
first_start, _ = self.ranges[0] | |
assert(start <= first_start) | |
if first_start > start: | |
new.add(start, first_start) | |
# Gaps | |
for i in range(len(self.ranges) - 1): | |
_, lhs_end = self.ranges[i] | |
rhs_start, _ = self.ranges[i + 1] | |
new.add(lhs_end, rhs_start) | |
# End | |
_, last_end = self.ranges[-1] | |
assert(end >= last_end) | |
if end > last_end: | |
new.add(last_end, end) | |
return new | |
def add(self, start, end=None): | |
if end == None: | |
end = start + 1 | |
# Empty range, ignore | |
if end <= start: | |
return | |
# Find spot to insert | |
new_index = bisect.bisect_right(self.ranges, (start, end)) | |
insert_start_index = new_index | |
insert_end_index = new_index | |
# Not inserting at start, check for overlap with previous | |
if new_index > 0: | |
prev_index = new_index - 1 | |
prev_start, prev_end = self.ranges[prev_index] | |
# Do we overlap or touch the preceding range? | |
if prev_end >= start: | |
# Does it contain us? | |
if prev_end >= end: | |
# We're already present. | |
return | |
# Remove existing entry in place of us | |
insert_start_index = prev_index | |
start = prev_start | |
# Overlap with following ranges | |
while insert_end_index < len(self.ranges): | |
next_start, next_end = self.ranges[insert_end_index] | |
if next_end > end: | |
# Partial overlap? | |
if next_start <= end: | |
insert_end_index += 1 | |
end = next_end | |
break | |
# Fully contained | |
insert_end_index += 1 | |
self.ranges[insert_start_index:insert_end_index] = [(start, end)] | |
assert(self.ranges == sorted(self.ranges)) | |
def mass(self): | |
return sum([end - start for start, end in self.ranges]) | |
def __repr__(self): | |
return repr(self.ranges) | |
def __len__(self): | |
return len(self.ranges) | |
def __getitem__(self, key): | |
return self.ranges[key] | |
def __delitem__(self, key): | |
del self.ranges[key] | |
def copy(self): | |
new = RangeSet() | |
new.ranges = self.ranges[:] | |
return new | |
def align_up(val, align): | |
return (val + align - 1) & ~(align - 1) | |
VPK_CONTENT_ALIGN = 16 | |
def vpk_content_split(data): | |
# Try to identify splits in the data based on common resource format. | |
# Source 2 resources start with the file size as the first field, we can | |
# use this. | |
chunks = [] | |
p = 0 | |
while p < len(data): | |
possibly_file_size = struct.unpack_from("<l", data, p)[0] | |
# Sanity check this field. It could be borked. | |
if possibly_file_size < 4 or possibly_file_size > len(data[p:]): | |
return None | |
# Check that padding is zero. | |
chunk_size = align_up(possibly_file_size, VPK_CONTENT_ALIGN) | |
if not all([b == 0 for b in data[p + possibly_file_size:p + chunk_size]]): | |
return None | |
# TODO: We include padding with the chunk data right now. Not sure if | |
# that's a good idea. | |
chunks.append(data[p:p + chunk_size]) | |
p += chunk_size | |
return chunks | |
def vpk_undelete(dir_vpk_path, dump_directory_path, log): | |
# Load directory VPK | |
vpk_dir, dir_vpk_name = os.path.split(dir_vpk_path) | |
dir_vpk_base, dir_vpk_ext = os.path.splitext(dir_vpk_name) | |
dir_vpk_suffix = "_dir" | |
if not dir_vpk_base.endswith(dir_vpk_suffix): | |
log("Unexpected VPK file name format") | |
return | |
vpk_prefix = dir_vpk_base[:-len(dir_vpk_suffix)] | |
data_vpk_name_format = vpk_prefix + "_{:03}" + dir_vpk_ext | |
# TODO: This seems slightly dangerous considering what we're passing in | |
# isn't really a path | |
data_vpk_path_format = os.path.join(vpk_dir, data_vpk_name_format) | |
with open(dir_vpk_path, "rb") as f: | |
dir_vpk_data = f.read() | |
# AUTO: Print hash | |
m = hashlib.sha256() | |
m.update(dir_vpk_data) | |
log("Info: path={}, size={}, sha256={}".format(dir_vpk_path, len(dir_vpk_data), m.hexdigest())) | |
# Parse directory VPK | |
# Header | |
if len(dir_vpk_data) < 0x1c: | |
log("File too small to be a VPK") | |
return | |
signature = struct.unpack_from("<L", dir_vpk_data, 0x0)[0] | |
if signature != 0x55aa1234: | |
log("Unexpected VPK signature (not a VPK?)") | |
return | |
version = struct.unpack_from("<L", dir_vpk_data, 0x4)[0] | |
if version != 2: | |
log("Unsupported VPK version {}".format(version)) | |
return | |
def read_string(data, offset, max_len=None): | |
result = bytearray() | |
i = 0 | |
while max_len == None or i < max_len: | |
char = data[offset + i] | |
if char == 0: | |
break | |
result.append(char) | |
i += 1 | |
return result.decode("utf-8") | |
p = 0x1c | |
# Types | |
file_types = [] | |
while dir_vpk_data[p]: | |
extension = read_string(dir_vpk_data, p) | |
p += len(extension) + 1 | |
# Directories | |
directories = [] | |
while dir_vpk_data[p]: | |
directory_path = read_string(dir_vpk_data, p) | |
p += len(directory_path) + 1 | |
# Files | |
files = [] | |
while dir_vpk_data[p]: | |
file_name = read_string(dir_vpk_data, p) | |
p += len(file_name) + 1 | |
crc = struct.unpack_from("<L", dir_vpk_data, p + 0x00)[0] | |
preload_bytes = struct.unpack_from("<H", dir_vpk_data, p + 0x04)[0] | |
archive_index = struct.unpack_from("<H", dir_vpk_data, p + 0x06)[0] | |
entry_offset = struct.unpack_from("<L", dir_vpk_data, p + 0x08)[0] | |
entry_length = struct.unpack_from("<L", dir_vpk_data, p + 0x0c)[0] | |
terminator = struct.unpack_from("<H", dir_vpk_data, p + 0x10)[0] | |
p += 0x12 | |
#assert(preload_bytes == 0) | |
p += preload_bytes | |
if archive_index == 0x7fff: | |
log("{} stored in dir, skipping".format(file_name)) | |
continue | |
assert(entry_length != 0) | |
assert(terminator == 0xffff) | |
file = { | |
"name": file_name, | |
"crc": crc, | |
"preload_bytes": preload_bytes, | |
"archive_index": archive_index, | |
"entry_offset": entry_offset, | |
"entry_length": entry_length | |
} | |
files.append(file) | |
# Skip terminator | |
p += 1 | |
directory = { | |
"directory_path": directory_path, | |
"files": files | |
} | |
directories.append(directory) | |
# Skip terminator | |
p += 1 | |
file_type = { | |
"extension": extension, | |
"directories": directories | |
} | |
file_types.append(file_type) | |
# Skip terminator | |
p += 1 | |
# Find highest index content VPK | |
present_content_vpks = RangeSet() | |
for t in file_types: | |
for d in t["directories"]: | |
for f in d["files"]: | |
present_content_vpks.add(f["archive_index"]) | |
content_vpk_count = present_content_vpks.ranges[-1][1] | |
log("Found {} referenced VPKs".format(content_vpk_count)) | |
# Find accounted-for ranges | |
accounted_range_sets = [RangeSet() for i in range(content_vpk_count)] | |
for t in file_types: | |
for d in t["directories"]: | |
for f in d["files"]: | |
archive_index = f["archive_index"] | |
start = f["entry_offset"] | |
end = start + f["entry_length"] | |
accounted_range_sets[archive_index].add(start, end) | |
# Find file sizes | |
content_vpk_sizes = [] | |
for i in range(content_vpk_count): | |
content_vpk_path = data_vpk_path_format.format(i) | |
try: | |
content_vpk_size = os.path.getsize(content_vpk_path) | |
except FileNotFoundError: | |
# Use last accounted for as estimated size | |
content_vpk_size = accounted_range_sets[-1][1] | |
content_vpk_sizes.append(content_vpk_size) | |
# Find areas not accounted for | |
unaccounted_range_sets = [ | |
r.invert(0, content_vpk_sizes[i]) for i, r in enumerate(accounted_range_sets) | |
] | |
# Prune chunks that can be attributed to alignment | |
for i in range(content_vpk_count): | |
unaccounted = unaccounted_range_sets[i] | |
ri = 0 | |
while ri < len(unaccounted): | |
start, end = unaccounted[ri] | |
# Discard likely alignment chunks (end on even and are smaller than the alignment) | |
if end & (VPK_CONTENT_ALIGN - 1) == 0 and end - start < VPK_CONTENT_ALIGN: | |
del unaccounted[ri] | |
continue | |
ri += 1 | |
# Dump results | |
def dump_vpk_file_region(data, path_format, start): | |
# Try to split. | |
chunks = vpk_content_split(data) | |
if not chunks: | |
# Failed to split. Dump in one piece. | |
# HACK: Gotta compute path early. | |
dump_path = path_format.format(start, len(data)) | |
log("Splitting heuristic failed, dumping as one chunk ({})".format(dump_path)) | |
chunks = [data] | |
p = start | |
for chunk in chunks: | |
dump_path = path_format.format(p, len(chunk)) | |
with open(dump_path, "wb") as dump_file: | |
dump_file.write(chunk) | |
p += len(chunk) | |
def dump_vpk_file_ranges(content_file, path_format, ranges): | |
for start, end in ranges: | |
# Align start upwards to exclude padding from the previous file | |
start = align_up(start, VPK_CONTENT_ALIGN) | |
content_file.seek(start) | |
dumped_data = content_file.read(end - start) | |
dump_vpk_file_region(dumped_data, path_format, start) | |
for i in range(content_vpk_count): | |
mass = unaccounted_range_sets[i].mass() | |
if mass > 0: | |
log("{:03}: {:.2f} KB unaccounted for across {} regions".format( | |
i, mass / 1024.0, len(unaccounted_range_sets[i]) | |
)) | |
# Dump data if requested | |
if dump_directory_path: | |
dump_path_format = os.path.join( | |
dump_directory_path, | |
"dump_{:03}".format(i) + "_{:x}_{:x}.bin" | |
) | |
content_vpk_path = data_vpk_path_format.format(i) | |
try: | |
with open(content_vpk_path, "rb") as f: | |
dump_vpk_file_ranges(f, dump_path_format, unaccounted_range_sets[i]) | |
except FileNotFoundError: | |
log("{} doesn't exist, can't dump.".format(content_vpk_path)) | |
log("Complete.") | |
def auto_mode(): | |
dir_vpk_names = [] | |
filenames = os.listdir(".") | |
for filename in filenames: | |
if filename.endswith("_dir.vpk"): | |
dir_vpk_names.append(filename) | |
base_directory_path = "./vpk_undelete_{}/".format(datetime.datetime.utcnow().timestamp()) | |
try: | |
os.mkdir(base_directory_path) | |
except: # FileExistsError | |
pass | |
for dir_vpk_name in dir_vpk_names: | |
print("Checking {}...".format(dir_vpk_name)) | |
dump_directory_path = os.path.join(base_directory_path, dir_vpk_name) | |
try: | |
os.mkdir(dump_directory_path) | |
except: # FileExistsError | |
pass | |
log_text = "" | |
def log(s): | |
print(s) | |
nonlocal log_text | |
log_text += "{}\n".format(s) | |
vpk_undelete(dir_vpk_name, dump_directory_path, log) | |
log_path = os.path.join(dump_directory_path, "log.txt") | |
with open(log_path, "w", encoding="utf-8") as f: | |
f.write(log_text) | |
def user_mode(): | |
vpk_undelete( | |
sys.argv[1], | |
sys.argv[2] if len(sys.argv) >= 3 else None, | |
) | |
def main(): | |
if len(sys.argv) not in [1, 2, 3]: | |
print("Usage: {} [VPK path] [dump directory path]".format(sys.argv[0])) | |
return | |
if len(sys.argv) == 1: | |
auto_mode() | |
else: | |
user_mode() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment