Instantly share code, notes, and snippets.

Embed
What would you like to do?
Parse the $O file.
# Parse $O File
# Copyright Matthew Seyer 2018
# Apache License Version 2
#
# decode_objfile.py FILE [OUTPUT_TEMPLATE]
#
# Examples:
# Output JSON lines:
# python .\decode_objfile.py '$O'
#
# Output via an output template
# python .\decode_objfile.py '$O' "{mft_reference[entry]},{object_id[uuid]},{object_id[timestamp]}"
#
import sys
import json
import struct
import codecs
import logging
import binascii
import datetime
logging.basicConfig(
level=logging.ERROR
)
class InvalidIndxPageHeader(Exception):
def __init__(self, message):
super(InvalidIndxPageHeader, self).__init__(message)
class FileTime(datetime.datetime):
"""datetime.datetime object is immutable, so we will create a class to inherit
datetime.datetime so we can set a custom nanosecond.
"""
def __new__(cls, *args, **kwargs):
return datetime.datetime.__new__(cls, *args, **kwargs)
@staticmethod
def from_dt_object(dt_object, nanoseconds=0):
ft = FileTime(
dt_object.year,
dt_object.month,
dt_object.day,
dt_object.hour,
dt_object.minute,
dt_object.second,
dt_object.microsecond
)
ft.nanoseconds = nanoseconds
return ft
def __str__(self):
return "{0.year}-{0.month:02}-{0.day:02} {0.hour:02}:{0.minute:02}:{0.second:02}.{0.nanoseconds}".format(self)
class NtfsReference(object):
def __init__(self, buf):
self._buffer = buf
@property
def reference(self):
return struct.unpack("<Q", self._buffer[0:8])[0]
@property
def entry(self):
return struct.unpack("<IH", self._buffer[0:6])[0]
@property
def sequence(self):
return struct.unpack("<H", self._buffer[6:8])[0]
def as_dict(self):
return {
"reference": self.reference,
"entry": self.entry,
"sequence": self.sequence
}
class ObjectId(object):
def __init__(self, buf):
self._buffer = buf
def raw(self):
return bytes(self._buffer)
@property
def timestamp(self):
# http://computerforensics.parsonage.co.uk/downloads/TheMeaningofLIFE.pdf
# The file ObjectID is a time based version which means it is created using a system time.
# The time is a 60 bit time value, a count of 100 nanosecond intervals of UTC since midnight
# at the start of 15th October 1582.
# Get le uint64
le_timestamp = struct.unpack("<Q", self._buffer[0:8])[0]
# remove first 4 bits used for version
le_timestamp = le_timestamp - (le_timestamp & 0xf000000000000000)
# see http://computerforensics.parsonage.co.uk/downloads/TheMeaningofLIFE.pdf
le_timestamp = le_timestamp - 5748192000000000
dt_object = datetime.datetime(1601, 1, 1) + datetime.timedelta(
microseconds=le_timestamp / 10
)
# filetime is 100 nanosecond resolution
nanoseconds = str(le_timestamp % 10000000).zfill(7) + '00'
filetime = FileTime.from_dt_object(
dt_object, nanoseconds=nanoseconds
)
return filetime
@property
def timestamp_uint64(self):
le_timestamp = struct.unpack("<Q", self._buffer[0:8])[0]
le_timestamp = le_timestamp - (le_timestamp & 0xf000000000000000)
return le_timestamp
@property
def version(self):
high_order = struct.unpack(">H", self._buffer[6:8])[0]
return high_order & 0x000f
@property
def variant(self):
field = struct.unpack(">H", self._buffer[8:10])[0]
return field >> 14
@property
def sequence(self):
field = struct.unpack(">H", self._buffer[8:10])[0]
return field & 0x3FFF
@property
def mac(self):
return bytes(self._buffer[10:16])
def as_dict(self):
return {
"uuid": str(self),
"hex": self.raw(),
"timestamp": str(self.timestamp),
"timestamp_uint64": self.timestamp_uint64,
"version": self.version,
"variant": self.variant,
"sequence": self.sequence,
"mac": self.mac
}
def __str__(self):
return "{:08x}-{:04x}-{:04x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}".format(
struct.unpack("<L", self._buffer[0:4])[0],
struct.unpack("<H", self._buffer[4:6])[0],
struct.unpack("<H", self._buffer[6:8])[0],
struct.unpack("<B", self._buffer[8:9])[0],
struct.unpack("<B", self._buffer[9:10])[0],
struct.unpack("<B", self._buffer[10:11])[0],
struct.unpack("<B", self._buffer[11:12])[0],
struct.unpack("<B", self._buffer[12:13])[0],
struct.unpack("<B", self._buffer[13:14])[0],
struct.unpack("<B", self._buffer[14:15])[0],
struct.unpack("<B", self._buffer[15:16])[0]
)
class IndexHeader(object):
def __init__(self, buf):
update_seq_off = struct.unpack("<H", buf[4:6])[0]
update_seq_size = struct.unpack("<H", buf[6:8])[0]
self._buffer = bytearray(buf[0:update_seq_off+update_seq_size*2])
def block_size(self):
"""The block size of the index will be the update sequence size - 1 * 512.
The update sequence array gets applied every 512 bytes (the first 2 bytes is the value)
"""
return (self.update_sequence_size - 1) * 512
def get_fixup_array(self):
"""Return the update sequence array as a list of 2 bytes each.
"""
so = self.update_sequence_offset
eo = self.update_sequence_offset+(self.update_sequence_size*2)
raw_buf = self._buffer[so:eo]
return [raw_buf[i:i + 2] for i in range(0, len(raw_buf), 2)]
@property
def signature(self):
return bytes(self._buffer[0:4])
@property
def update_sequence_offset(self):
return struct.unpack("<H", self._buffer[4:6])[0]
@property
def update_sequence_size(self):
return struct.unpack("<H", self._buffer[6:8])[0]
@property
def logfile_sequence_number(self):
return struct.unpack("<Q", self._buffer[8:16])[0]
@property
def vcn(self):
return struct.unpack("<Q", self._buffer[16:24])[0]
@property
def index_entry_offset(self):
return struct.unpack("<I", self._buffer[24:28])[0]
@property
def index_entry_size(self):
return struct.unpack("<I", self._buffer[28:32])[0]
@property
def allocated_index_entry_size(self):
return struct.unpack("<I", self._buffer[32:36])[0]
@property
def leaf_node(self):
return struct.unpack("<B", self._buffer[36:37])[0]
@property
def update_sequence(self):
return binascii.b2a_hex(
self._buffer[40:42]
)
class IndexOEntry(object):
def __init__(self, buf, offset=None):
self._offset = offset
logging.debug("Index Entry at Offset: {}".format(self._offset))
offset = struct.unpack("<H", buf[0:2])[0]
size = struct.unpack("<H", buf[2:4])[0]
self._buffer = buf[0:offset+size]
def get_offset(self):
return self._offset
@property
def data_offset(self):
"""This should be 32"""
return struct.unpack("<H", self._buffer[0:2])[0]
@property
def data_size(self):
"""This should be 56"""
return struct.unpack("<H", self._buffer[2:4])[0]
@property
def entry_size(self):
"""This should be 88"""
return struct.unpack("<H", self._buffer[8:10])[0]
@property
def key_size(self):
"""This should be 16"""
return struct.unpack("<H", self._buffer[10:12])[0]
@property
def flags(self):
"""1 = Entry has subnodes; 2 = Last Entry"""
return struct.unpack("<H", self._buffer[12:14])[0]
@property
def object_id(self):
return ObjectId(self._buffer[16:32])
@property
def mft_reference(self):
return NtfsReference(
self._buffer[32:40]
)
@property
def birth_volume(self):
return ObjectId(self._buffer[40:56])
@property
def birth_object(self):
return ObjectId(self._buffer[56:72])
@property
def birth_domain(self):
return ObjectId(self._buffer[72:88])
def as_dict(self):
return {
"offset": self._offset,
"flags": self.flags,
"object_id": self.object_id.as_dict(),
"mft_reference": self.mft_reference.as_dict(),
"birth_volume": self.birth_volume.as_dict(),
"birth_object": self.birth_object.as_dict(),
"birth_domain": self.birth_domain.as_dict()
}
class IndexPage(object):
def __init__(self, file_handle, offset):
self._offset = offset
logging.debug("Parsing Index Page at offset: {}".format(self._offset))
raw_buffer = file_handle.read(64)
if not bytes(raw_buffer[0:4]) == b"INDX":
raise(
InvalidIndxPageHeader(
"Invalid Page Header Signature [{}] at offset: {}".format(
bytes(raw_buffer[0:4]),
self._offset
)
)
)
self.header = IndexHeader(
raw_buffer
)
block_size = self.header.block_size()
self._index_block_buf = bytearray(
raw_buffer + file_handle.read(
block_size - 64
)
)
self._fix_raw_block()
def get_page_size(self):
return self.header.block_size()
def _fix_raw_block(self):
"""Apply the update sequence array to their respected offsets.
"""
fix_up_array = self.header.get_fixup_array()
# first item in array is the update sequence value
for i in range(self.header.update_sequence_size-1):
v1 = fix_up_array[i+1][0]
v2 = fix_up_array[i + 1][1]
self._index_block_buf[(i*512)+510] = v1
self._index_block_buf[(i*512)+511] = v2
def iter_entries(self):
pointer = self.header.index_entry_offset + 24
entry = IndexOEntry(
self._index_block_buf[pointer:],
offset=self._offset+pointer
)
pointer += entry.entry_size
while True:
yield entry
if pointer >= self.header.index_entry_size:
break
entry = IndexOEntry(
self._index_block_buf[pointer:],
offset=self._offset+pointer
)
pointer += entry.entry_size
class ObjectIdFile(object):
def __init__(self, file_handle):
self._file_handle = file_handle
self._offset = 0
self._file_handle.seek(0, 2)
self._file_size = self._file_handle.tell()
self._file_handle.seek(0, 0)
def iter_index_pages(self):
index = IndexPage(
self._file_handle,
offset=self._offset
)
self._offset += index.get_page_size()
while True:
yield index
if self._offset == self._file_size:
break
self._file_handle.seek(
self._offset
)
try:
index = IndexPage(
self._file_handle,
offset=self._offset
)
except InvalidIndxPageHeader as error:
logging.error(error)
break
self._offset += index.get_page_size()
class ComplexEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return obj.hex()
return json.JSONEncoder.default(self, obj)
def main():
filename = sys.argv[1]
out_template = None
if len(sys.argv) > 2:
out_template = sys.argv[2]
with open(filename, 'rb') as fh:
obj_id_file = ObjectIdFile(
fh
)
for index_page in obj_id_file.iter_index_pages():
for entry in index_page.iter_entries():
if out_template:
print(
out_template.format(
**entry.as_dict()
)
)
else:
print(
json.dumps(entry.as_dict(), cls=ComplexEncoder)
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment