Parse the $O file.
# Parse $O File | |
# Copyright Matthew Seyer 2018 | |
# Apache License Version 2 | |
# | |
# decode_objfile.py FILE [OUTPUT_TEMPLATE] | |
# | |
# Examples: | |
# Output JSON lines: | |
# python .\decode_objfile.py '$O' | |
# | |
# Output via an output template | |
# python .\decode_objfile.py '$O' "{mft_reference[entry]},{object_id[uuid]},{object_id[timestamp]}" | |
# | |
import sys | |
import json | |
import struct | |
import codecs | |
import logging | |
import binascii | |
import datetime | |
logging.basicConfig( | |
level=logging.ERROR | |
) | |
class InvalidIndxPageHeader(Exception): | |
def __init__(self, message): | |
super(InvalidIndxPageHeader, self).__init__(message) | |
class FileTime(datetime.datetime): | |
"""datetime.datetime object is immutable, so we will create a class to inherit | |
datetime.datetime so we can set a custom nanosecond. | |
""" | |
def __new__(cls, *args, **kwargs): | |
return datetime.datetime.__new__(cls, *args, **kwargs) | |
@staticmethod | |
def from_dt_object(dt_object, nanoseconds=0): | |
ft = FileTime( | |
dt_object.year, | |
dt_object.month, | |
dt_object.day, | |
dt_object.hour, | |
dt_object.minute, | |
dt_object.second, | |
dt_object.microsecond | |
) | |
ft.nanoseconds = nanoseconds | |
return ft | |
def __str__(self): | |
return "{0.year}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}.{0.nanoseconds}".format(self) | |
class NtfsReference(object): | |
def __init__(self, buf): | |
self._buffer = buf | |
@property | |
def reference(self): | |
return struct.unpack("<Q", self._buffer[0:8])[0] | |
@property | |
def entry(self): | |
return struct.unpack("<IH", self._buffer[0:6])[0] | |
@property | |
def sequence(self): | |
return struct.unpack("<H", self._buffer[6:8])[0] | |
def as_dict(self): | |
return { | |
"reference": self.reference, | |
"entry": self.entry, | |
"sequence": self.sequence | |
} | |
class ObjectId(object): | |
def __init__(self, buf): | |
self._buffer = buf | |
def raw(self): | |
return bytes(self._buffer) | |
@property | |
def timestamp(self): | |
# http://computerforensics.parsonage.co.uk/downloads/TheMeaningofLIFE.pdf | |
# The file ObjectID is a time based version which means it is created using a system time. | |
# The time is a 60 bit time value, a count of 100 nanosecond intervals of UTC since midnight | |
# at the start of 15th October 1582. | |
# Get le uint64 | |
le_timestamp = struct.unpack("<Q", self._buffer[0:8])[0] | |
# remove first 4 bits used for version | |
le_timestamp = le_timestamp - (le_timestamp & 0xf000000000000000) | |
# see http://computerforensics.parsonage.co.uk/downloads/TheMeaningofLIFE.pdf | |
le_timestamp = le_timestamp - 5748192000000000 | |
dt_object = datetime.datetime(1601, 1, 1) + datetime.timedelta( | |
microseconds=le_timestamp / 10 | |
) | |
# filetime is 100 nanosecond resolution | |
nanoseconds = str(le_timestamp % 10000000).zfill(7) + '00' | |
filetime = FileTime.from_dt_object( | |
dt_object, nanoseconds=nanoseconds | |
) | |
return filetime | |
@property | |
def timestamp_uint64(self): | |
le_timestamp = struct.unpack("<Q", self._buffer[0:8])[0] | |
le_timestamp = le_timestamp - (le_timestamp & 0xf000000000000000) | |
return le_timestamp | |
@property | |
def version(self): | |
high_order = struct.unpack(">H", self._buffer[6:8])[0] | |
return high_order & 0x000f | |
@property | |
def variant(self): | |
field = struct.unpack(">H", self._buffer[8:10])[0] | |
return field >> 14 | |
@property | |
def sequence(self): | |
field = struct.unpack(">H", self._buffer[8:10])[0] | |
return field & 0x3FFF | |
@property | |
def mac(self): | |
return bytes(self._buffer[10:16]) | |
def as_dict(self): | |
return { | |
"uuid": str(self), | |
"hex": self.raw(), | |
"timestamp": str(self.timestamp), | |
"timestamp_uint64": self.timestamp_uint64, | |
"version": self.version, | |
"variant": self.variant, | |
"sequence": self.sequence, | |
"mac": self.mac | |
} | |
def __str__(self): | |
return "{:08x}-{:04x}-{:04x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}".format( | |
struct.unpack("<L", self._buffer[0:4])[0], | |
struct.unpack("<H", self._buffer[4:6])[0], | |
struct.unpack("<H", self._buffer[6:8])[0], | |
struct.unpack("<B", self._buffer[8:9])[0], | |
struct.unpack("<B", self._buffer[9:10])[0], | |
struct.unpack("<B", self._buffer[10:11])[0], | |
struct.unpack("<B", self._buffer[11:12])[0], | |
struct.unpack("<B", self._buffer[12:13])[0], | |
struct.unpack("<B", self._buffer[13:14])[0], | |
struct.unpack("<B", self._buffer[14:15])[0], | |
struct.unpack("<B", self._buffer[15:16])[0] | |
) | |
class IndexHeader(object): | |
def __init__(self, buf): | |
update_seq_off = struct.unpack("<H", buf[4:6])[0] | |
update_seq_size = struct.unpack("<H", buf[6:8])[0] | |
self._buffer = bytearray(buf[0:update_seq_off+update_seq_size*2]) | |
def block_size(self): | |
"""The block size of the index will be the update sequence size - 1 * 512. | |
The update sequence array gets applied every 512 bytes (the first 2 bytes is the value) | |
""" | |
return (self.update_sequence_size - 1) * 512 | |
def get_fixup_array(self): | |
"""Return the update sequence array as a list of 2 bytes each. | |
""" | |
so = self.update_sequence_offset | |
eo = self.update_sequence_offset+(self.update_sequence_size*2) | |
raw_buf = self._buffer[so:eo] | |
return [raw_buf[i:i + 2] for i in range(0, len(raw_buf), 2)] | |
@property | |
def signature(self): | |
return bytes(self._buffer[0:4]) | |
@property | |
def update_sequence_offset(self): | |
return struct.unpack("<H", self._buffer[4:6])[0] | |
@property | |
def update_sequence_size(self): | |
return struct.unpack("<H", self._buffer[6:8])[0] | |
@property | |
def logfile_sequence_number(self): | |
return struct.unpack("<Q", self._buffer[8:16])[0] | |
@property | |
def vcn(self): | |
return struct.unpack("<Q", self._buffer[16:24])[0] | |
@property | |
def index_entry_offset(self): | |
return struct.unpack("<I", self._buffer[24:28])[0] | |
@property | |
def index_entry_size(self): | |
return struct.unpack("<I", self._buffer[28:32])[0] | |
@property | |
def allocated_index_entry_size(self): | |
return struct.unpack("<I", self._buffer[32:36])[0] | |
@property | |
def leaf_node(self): | |
return struct.unpack("<B", self._buffer[36:37])[0] | |
@property | |
def update_sequence(self): | |
return binascii.b2a_hex( | |
self._buffer[40:42] | |
) | |
class IndexOEntry(object): | |
def __init__(self, buf, offset=None): | |
self._offset = offset | |
logging.debug("Index Entry at Offset: {}".format(self._offset)) | |
offset = struct.unpack("<H", buf[0:2])[0] | |
size = struct.unpack("<H", buf[2:4])[0] | |
self._buffer = buf[0:offset+size] | |
def get_offset(self): | |
return self._offset | |
@property | |
def data_offset(self): | |
"""This should be 32""" | |
return struct.unpack("<H", self._buffer[0:2])[0] | |
@property | |
def data_size(self): | |
"""This should be 56""" | |
return struct.unpack("<H", self._buffer[2:4])[0] | |
@property | |
def entry_size(self): | |
"""This should be 88""" | |
return struct.unpack("<H", self._buffer[8:10])[0] | |
@property | |
def key_size(self): | |
"""This should be 16""" | |
return struct.unpack("<H", self._buffer[10:12])[0] | |
@property | |
def flags(self): | |
"""1 = Entry has subnodes; 2 = Last Entry""" | |
return struct.unpack("<H", self._buffer[12:14])[0] | |
@property | |
def object_id(self): | |
return ObjectId(self._buffer[16:32]) | |
@property | |
def mft_reference(self): | |
return NtfsReference( | |
self._buffer[32:40] | |
) | |
@property | |
def birth_volume(self): | |
return ObjectId(self._buffer[40:56]) | |
@property | |
def birth_object(self): | |
return ObjectId(self._buffer[56:72]) | |
@property | |
def birth_domain(self): | |
return ObjectId(self._buffer[72:88]) | |
def as_dict(self): | |
return { | |
"offset": self._offset, | |
"flags": self.flags, | |
"object_id": self.object_id.as_dict(), | |
"mft_reference": self.mft_reference.as_dict(), | |
"birth_volume": self.birth_volume.as_dict(), | |
"birth_object": self.birth_object.as_dict(), | |
"birth_domain": self.birth_domain.as_dict() | |
} | |
class IndexPage(object): | |
def __init__(self, file_handle, offset): | |
self._offset = offset | |
logging.debug("Parsing Index Page at offset: {}".format(self._offset)) | |
raw_buffer = file_handle.read(64) | |
if not bytes(raw_buffer[0:4]) == b"INDX": | |
raise( | |
InvalidIndxPageHeader( | |
"Invalid Page Header Signature [{}] at offset: {}".format( | |
bytes(raw_buffer[0:4]), | |
self._offset | |
) | |
) | |
) | |
self.header = IndexHeader( | |
raw_buffer | |
) | |
block_size = self.header.block_size() | |
self._index_block_buf = bytearray( | |
raw_buffer + file_handle.read( | |
block_size - 64 | |
) | |
) | |
self._fix_raw_block() | |
def get_page_size(self): | |
return self.header.block_size() | |
def _fix_raw_block(self): | |
"""Apply the update sequence array to their respected offsets. | |
""" | |
fix_up_array = self.header.get_fixup_array() | |
# first item in array is the update sequence value | |
for i in range(self.header.update_sequence_size-1): | |
v1 = fix_up_array[i+1][0] | |
v2 = fix_up_array[i + 1][1] | |
self._index_block_buf[(i*512)+510] = v1 | |
self._index_block_buf[(i*512)+511] = v2 | |
def iter_entries(self): | |
pointer = self.header.index_entry_offset + 24 | |
entry = IndexOEntry( | |
self._index_block_buf[pointer:], | |
offset=self._offset+pointer | |
) | |
pointer += entry.entry_size | |
while True: | |
yield entry | |
if pointer >= self.header.index_entry_size: | |
break | |
entry = IndexOEntry( | |
self._index_block_buf[pointer:], | |
offset=self._offset+pointer | |
) | |
pointer += entry.entry_size | |
class ObjectIdFile(object): | |
def __init__(self, file_handle): | |
self._file_handle = file_handle | |
self._offset = 0 | |
self._file_handle.seek(0, 2) | |
self._file_size = self._file_handle.tell() | |
self._file_handle.seek(0, 0) | |
def iter_index_pages(self): | |
index = IndexPage( | |
self._file_handle, | |
offset=self._offset | |
) | |
self._offset += index.get_page_size() | |
while True: | |
yield index | |
if self._offset == self._file_size: | |
break | |
self._file_handle.seek( | |
self._offset | |
) | |
try: | |
index = IndexPage( | |
self._file_handle, | |
offset=self._offset | |
) | |
except InvalidIndxPageHeader as error: | |
logging.error(error) | |
break | |
self._offset += index.get_page_size() | |
class ComplexEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, bytes): | |
return obj.hex() | |
return json.JSONEncoder.default(self, obj) | |
def main(): | |
filename = sys.argv[1] | |
out_template = None | |
if len(sys.argv) > 2: | |
out_template = sys.argv[2] | |
with open(filename, 'rb') as fh: | |
obj_id_file = ObjectIdFile( | |
fh | |
) | |
for index_page in obj_id_file.iter_index_pages(): | |
for entry in index_page.iter_entries(): | |
if out_template: | |
print( | |
out_template.format( | |
**entry.as_dict() | |
) | |
) | |
else: | |
print( | |
json.dumps(entry.as_dict(), cls=ComplexEncoder) | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment