Skip to content

Instantly share code, notes, and snippets.

@TerrorBite
Created December 15, 2016 09:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TerrorBite/c1c0be9a450df361dbfd4cb72dd56fb4 to your computer and use it in GitHub Desktop.
Save TerrorBite/c1c0be9a450df361dbfd4cb72dd56fb4 to your computer and use it in GitHub Desktop.
This file was a work in progress
import struct
import crc32c
import errno
import gw2util
# Headers
gw2_header1 = struct.Struct("<B3sIII")
gw2_header2 = struct.Struct("<IIQII")
# GW1 header
gw1_header2 = struct.Struct("<QII")
# Master File Table
mft_header = struct.Struct("<4sQIQ")
mft_entry = struct.Struct("<QIHHII")
# Miscellaneous
intpair = struct.Struct("<II")
intthree = struct.Struct("<III")
MFT_FILE_OFFSET = 16
#filepath = r"H:\Guild Wars 2\Gw2.dat"
filepath = r"C:\Users\TerrorBite\AppData\Roaming\Guild Wars 2\Local.dat"
#filepath = r"E:\SteamLibrary\SteamApps\common\Guild Wars\Gw.dat"
def interpreter():
import code, inspect
code.interact(local=inspect.currentframe().f_back.f_locals)
class Datfile(object):
def __init__(self, filepath):
# Open file
try:
self.f = open(filepath, 'rb')
except IOError as e:
if e.errno == errno.EACCES:
print "Cannot open file: Access denied (is Guild Wars 2 using the file?)"
exit()
header1 = self.f.read(16)
version, identifier, headersize, unknown1, chunksize = gw2_header1.unpack(header1)
if version < 52:
# Guild Wars 1 compat
crc = chunksize
chunksize = unknown1
header1 = header1[:12]
if identifier=='\x41\x4e\x1a':
print "ArenaNet data file version {0} detected".format(version)
if version < 52:
print "Warning: This appears to be a Guild Wars 1 data file.\r\n This format is currently unsupported."
else:
print "File is not a Guild Wars 2 data file"
exit()
print "This file uses {0} byte chunks".format(chunksize)
header2 = self.f.read(headersize-16)
if version < 52:
# Guild Wars 1 compat
mft_offset, mft_size, flags = gw1_header2.unpack(header2)
else:
crc, unknown2, mft_offset, mft_size, flags = gw2_header2.unpack(header2)
print "UK1={0:x} UK2={1:x} CRC={2:x}".format(unknown1, unknown2, crc)
computed_crc = crc32c.crc(header1)
#print "CRCs: {0:x} should equal {1:x}".format(crc, computed_crc)
if computed_crc != crc: print "WARNING: Header CRC mismatch, is the dat file corrupt?"
# Read master file table
print "Master File Table is located at 0x{0:x}, size {1} bytes".format(mft_offset, mft_size)
self.mft = MasterFileTable(self.f, mft_offset, mft_size)
# Read file ID table
self.fit = FileIdTable(self.mft)
#for x in xrange(16, len(self.mft)):
# if self.mft[x].in_use and not self.mft[x].compressed:
# print x, repr(self.mft.peek(x)[0:32])
# if x>1024: break
def read_file(self, fileid):
return self.mft.data(self.fit.entry_from_fileid(fileid))
def close(self):
self.f.close()
self.f = None
class MftEntry(object):
def __init__(self, data):
self.offset, self.size, compressionFlags, self.flags, self.counter, self.crc = mft_entry.unpack(data)
self.compressed = True if (compressionFlags & 0x08) != 0 else False
self.in_use = True if (self.flags & 0x01) != 0 else False
self.crc_valid = None
def read(self, f, size=None, offset=0):
f.seek(self.offset+offset)
return f.read(self.size-offset if (size is None or size > self.size-offset) else size)
def check_crc(self, f):
self.crc_valid = (crc32c.crc(self.read(f)) == self.crc)
return self.crc_valid
def __len__(self):
return self.size
def __repr__(self):
return "<MftEntry: {0} bytes {3}at 0x{1:x}, flags={2}, crc=0x{4:x} ({5})>".format(
self.size, self.offset, self.flags, 'compressed ' if self.compressed else '', self.crc, 'valid' if self.crc_valid else 'unchecked' if self.crc_valid is None else 'INVALID')
class MftHeaderEntry(MftEntry):
def __init__(self):
super(MftHeaderEntry, self).__init__('\x00'*24)
def __repr__(self):
return "<MftHeaderEntry>"
class MasterFileTable(object):
def __init__(self, f, offset, size):
self.size = size
self.offset = offset
self.f = f
print "Reading Master File Table...",
f.seek(offset)
identifier, unknown1, self._entry_count, unknown2 = mft_header.unpack(f.read(24))
if identifier != 'Mft\x1a':
print "Invalid MFT identifier!"
exit()
comp_count, file_count = 0, 0
# read entries
self._entries = []
self._entries.append(MftHeaderEntry())
for i in xrange(len(self)):
entry = MftEntry(self.f.read(24))
if i>16 and entry.in_use:
file_count += 1
if entry.compressed: comp_count += 1
self._entries.append(entry)
print "found {0} files ({1} compressed) in {2} entries".format(file_count, comp_count, len(self))
#DEBUG
print "First 64 entries:"
for x in xrange(64):
#self._entries[x].check_crc(self.f)
print x, repr(self._entries[x])
if self._entries[x].compressed:
identifier, fullsize = intpair.unpack(self._entries[x].read(self.f, 8))
size = self._entries[x].size
ratio = 1.0 - (float(size)/fullsize)
ccrc = crc32c.crc(self._entries[x].read(self.f, 65532 if size>65536 else size-4))
crc = struct.unpack("<I", self._entries[x].read(self.f, 4, 65532 if size>65536 else size-4))[0]
print "{0} -- Uncompressed size: {1} bytes, {2:.0%} compressed, crc={3:x}".format(x, fullsize, ratio, crc)
if(ccrc==crc): print "{0} -- CRC Validated :)".format(x)
else:
print "{0} -- !!CRC INVALID!!".format(x)
exit()
#dbg = {}
#for e in self._entries:
# dbg[e.crc] = (1+dbg[e.crc]) if e.crc in dbg else 1
#sorted_dbg = sorted(dbg.items(), key=lambda x:x[1], reverse=True)
#for k, v in sorted_dbg:
# print "{0:x} ({1}), ".format(k, v),
#print
#END DEBUG
def raw_data(self, eindex):
"""
Returns the raw contents of the file located at given entry index.
Data may be in a compressed form.
"""
return self._entries[eindex].read(self.f)
def data(self, eindex):
"""
Returns the contents of the file located at the given entry index.
If the file is compressed, the decompressed data will be returned.
"""
#TODO: Decompress data
return self.raw_data(eindex)
def peek(self, entry):
return self._entries[entry].read(self.f, 512)
def __len__(self):
return self._entry_count
def __getitem__(self, entry):
return self._entries[entry]
class IdEntry(object):
def __init__(self):
self.base_id = 0
self.file_id = 0
def is_full(self):
return self.base_id > 0 and self.file_id > 0
def fill(self, fid):
if self.base_id == 0:
self.base_id = fid
elif self.file_id == 0:
self.file_id = fid
if self.is_full() and self.base_id > self.file_id:
self.base_id, self.file_id = self.file_id, self.base_id
def __repr__(self):
return "<IdEntry(base_id={0}, file_id={1})>".format(self.base_id, self.file_id)
class FileIdTable(object):
def __init__(self, mft):
print "Reading File ID table...",
# Retrieve file ID table data via master file table
data = mft.data(2)
# Unpack file ID table to a list of tuples
table = map(intpair.unpack, (data[i:i+8] for i in xrange(0, len(data), 8)))
print "{0} entries".format(len(table))
# Create blank entry_to_id array
self._entries = [IdEntry() for x in xrange(len(mft))]
# Build list of (base_id, file_id) pairs from table
for file_id, entry_index in table:
if file_id > 0 and entry_index > 0: # Ignore zero entries
self._entries[entry_index].fill(file_id)
print "Entry to BaseID/FileID mapping created successfully"
# Build list of base IDs
bases = {}
files = {}
for i in xrange(len(self._entries)):
entry = self._entries[i]
if entry.base_id != 0:
bases[entry.base_id] = (i, entry.file_id)
if entry.file_id != 0:
files[entry.file_id] = (i, entry.base_id)
print "BaseID, FileID to Entry mappings created successfully"
self._bases, self._files = bases, files
#DEBUG
for x in xrange(64):
print "{0:03}: {1}".format(x, self._entries[x])
print sorted(bases.items())[:64]
print sorted(files.items())[:64]
#END DEBUG
def fileid_from_entry(self, eindex):
"""
Given an MFT entry index, returns a File ID.
If the associated file only has a Base ID, will return the Base ID.
"""
if eindex < len(self._entries):
entry = self._entries[eindex]
if entry.file_id == 0:
return entry.base_id
return entry.file_id
raise IndexError("The index given exceeds the size of the MFT.")
def baseid_from_entry(self, eindex):
"""
Given an MFT entry index, returns a Base ID.
"""
if eindex < len(self._entries):
entry = self._entries[eindex]
if entry.file_id == 0:
return entry.base_id
return entry.file_id
raise IndexError("The index given exceeds the size of the MFT.")
def entry_from_fileid(self, fileid):
"""
Returns the MFT entry index for a given File ID.
"""
if fileid in self._files:
return self._files[fileid][0]
# If file ID is not found, try as a base ID
return self.entry_from_baseid(fileid)
def entry_from_baseid(self, baseid):
if baseid in self._bases:
return self._bases[baseid][0]
raise IndexError("The File ID provided does not exist.")
def __getitem__(self, key):
return self._entry_to_id[key]
if __name__=='__main__':
datfile = Datfile(filepath)
two = datfile.read_file(2)
root_manifest = datfile.read_file(4101)
open('root_manifest', 'wb').write(gw2util.decompress(root_manifest, 0))
print datfile.mft[datfile.fit.entry_from_baseid(2)]
print datfile.mft[datfile.fit.entry_from_baseid(12)]
print "Length of baseid(2):", len(two)
print repr(two)
a, b = intpair.unpack(two[0:8])
print a, b
#open('map_complete.webp', 'wb').write(datfile.read_file(528724))
datfile.close()
print "Exiting."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment