Skip to content

Instantly share code, notes, and snippets.

@kk7ds
Last active July 30, 2020 22:54
Show Gist options
  • Save kk7ds/8290e29bcde4ecb1cfb28b3327cdfabd to your computer and use it in GitHub Desktop.
Save kk7ds/8290e29bcde4ecb1cfb28b3327cdfabd to your computer and use it in GitHub Desktop.
import logging
import re
import struct
import subprocess
import sys
Mi = 1024 * 1024
LOG = logging.getLogger(__name__)
class FileInspector(object):
"""A stream-based disk image inspector.
This base class works on raw images and is subclassed for more
complex types. It is to be presented with the file to be examined
one chunk at a time, during read processing and will only store
as much data as necessary to determine required attributes of
the file.
"""
def __init__(self):
self._total_count = 0
self._log = logging.getLogger(str(self))
def eat_chunk(self, chunk):
"""Call this to present chunks of the file to the inspector."""
self._total_count += len(chunk)
@property
def format_match(self):
"""Returns True if the file appears to be the expected format."""
return True
@property
def virtual_size(self):
"""Returns the virtual size of the disk image, or zero if unknown."""
return self._total_count
@property
def actual_size(self):
"""Returns the total size of the file, usually smaller than
virtual_size."""
return self._total_count
def __str__(self):
"""The string name of this file format."""
return 'raw'
@property
def context_info(self):
"""Return info on amount of data held in memory for auditing.
This is a dict of region:sizeinbytes items that the inspector
uses to examine the file.
"""
return {}
class QcowInspector(FileInspector):
def __init__(self):
super(QcowInspector, self).__init__()
self._header_buffer = b''
def eat_chunk(self, chunk):
super(QcowInspector, self).eat_chunk(chunk)
if len(self._header_buffer) < 32:
self._header_buffer += chunk
def _qcow_header_data(self):
magic, version, bf_offset, bf_sz, cluster_bits, size = (
struct.unpack('>IIQIIQ', self._header_buffer[:32]))
return magic, size
@property
def virtual_size(self):
if len(self._header_buffer) < 32:
return 0
magic, size = self._qcow_header_data()
return size
@property
def format_match(self):
if len(self._header_buffer) < 32:
return False
magic, size = self._qcow_header_data()
return magic == 0x514649FB
def __str__(self):
return 'qcow2'
@property
def context_info(self):
return {'qcowheader': len(self._header_buffer)}
class VHDInspector(FileInspector):
def __init__(self):
super(VHDInspector, self).__init__()
self._header_buffer = b''
def eat_chunk(self, chunk):
super(VHDInspector, self).eat_chunk(chunk)
if len(self._header_buffer) < 512:
self._header_buffer += chunk[:512]
@property
def format_match(self):
return self._header_buffer.startswith(b'conectix')
@property
def virtual_size(self):
if len(self._header_buffer) < 512:
return 0
return struct.unpack('>Q', self._header_buffer[40:48])[0]
def __str__(self):
return 'vhd'
@property
def context_info(self):
return {'header': len(self._header_buffer)}
class VHDXInspector(FileInspector):
METAREGION = '8B7CA206-4790-4B9A-B8FE-575F050F886E'
VIRTUAL_DISK_SIZE= '2FA54224-CD1B-4876-B211-5DBED83BF4B8'
def __init__(self):
super(VHDXInspector, self).__init__()
self._header_buffer = b''
self._metadata_buffer = b''
self._metadata_start = None
self._metadata_end = None
self._vds_offset = 0
def _collect_metadata(self, chunk):
pass
def _in_meta_region(self, chunk_len):
read_start = self._total_count - chunk_len
return (self._metadata_start and
self._total_count > self._metadata_start and
(not self._metadata_end or
(read_start < self._metadata_end)))
def eat_chunk(self, chunk):
super(VHDXInspector, self).eat_chunk(chunk)
if len(self._header_buffer) < (256 * 1024):
# We need to store at least 320k for the image header
self._header_buffer += chunk
elif self._metadata_start is None:
# We must have enough data to figure out the metadata location
self._metadata_start = self._find_meta_region()
self._log.debug('Found metadata region offset at %x' % (
self._metadata_start))
if self._in_meta_region(len(chunk)):
# We're reading across the metadata region, so save this
# in our buffer. Be accurate on the start, but not so
# much on the end. We will start reading it before we
# know how much we need
self._log.debug('In meta at %x' % self._total_count)
read_start = self._total_count - len(chunk)
if self._metadata_start > read_start:
# If this chunk includes the start of the region,
# skip the leading part so we start aligned
lead_gap = self._metadata_start - read_start
else:
lead_gap = 0
self._metadata_buffer += chunk[lead_gap:]
offset, length = self._find_meta_entry_offset_length(
self.VIRTUAL_DISK_SIZE)
if offset:
# We've read enough of the records to have found the
# virtual disk size metadata item. Record its offset
# and set our marker for when we can stop capturing
# metadata
self._vds_offset = offset
self._log.debug('Found end of needed metadata: %x' % (
offset + length))
self._metadata_end = self._metadata_start + offset + length
@property
def format_match(self):
return self._header_buffer.startswith(b'vhdxfile')
@staticmethod
def _guid(buf):
"""Format a MSFT GUID from the 16-byte input buffer."""
guid_format = 'IHHBBBBBBBB'
return '%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X' % (
struct.unpack(guid_format, buf))
def _find_meta_region(self):
# The region table starts at offset 192k in the header/file
region_table_offset = 192 * 1024
# The region table entries start after a 16-byte table header
region_entry_first = region_table_offset + 16
# Parse the region table header to find the number of regions
regi, cksum, count, reserved = struct.unpack(
'<IIII',
self._header_buffer[region_table_offset:region_table_offset + 16])
if regi != 0x69676572:
self._log.warning('Region signature not found at %x' % (
region_table_offset))
return 0
# Process the regions until we find the metadata one; grab the
# offset and return
self._log.debug('Region entry first is %x' % region_entry_first)
self._log.debug('Region entries %i' % count)
meta_offset = 0
for i in range(0, count):
entry_start = region_entry_first + (i * 32)
entry_end = entry_start + 32
entry = self._header_buffer[entry_start:entry_end]
self._log.debug('Entry offset is %x' % entry_start)
# GUID is the first 16 bytes
guid = self._guid(entry[:16])
if guid == self.METAREGION:
# This entry is the metadata region entry
meta_offset, meta_len, meta_req = struct.unpack(
'QII', entry[16:])
self._log.debug('Meta entry %i specifies offset: %x' % (
i, meta_offset))
return meta_offset
self._log.warning('Did not find metadata region')
return None
def _find_meta_entry_offset_length(self, desired_guid):
if len(self._metadata_buffer) < 32:
# Not enough data yet for full header
return 0, 0
# Make sure we found the metadata region by checking the signature
sig, reserved, count = struct.unpack('8sHH',
self._metadata_buffer[:12])
if sig != 'metadata':
self._log.warning('Invalid signature for metadata region: %r' % (
sig))
return 0, 0
entries_size = 32 + (count * 32)
if len(self._metadata_buffer) < entries_size:
# Not enough data yet for all metadata entries. This is not
# strictly necessary as we could process whatever we have until
# we find the V-D-S one, but the actual size payload will come
# after the last entry, so there is no point in doing the work
# until we find all of them.
return 0, 0
for i in range(0, count):
entry_offset = 32 + (i * 32)
guid = self._guid(self._metadata_buffer[entry_offset:
entry_offset + 16])
if guid == desired_guid:
# Found the item we are looking for by id.
# Return the size and offset
item_offset, item_length, foo, bar = struct.unpack(
'IIII',
self._metadata_buffer[entry_offset + 16:entry_offset + 32])
return item_offset, item_length
self._log.warning('Did not find guid %s' % desired_guid)
return 0, 0
@property
def virtual_size(self):
# Until we have found the offset and have enough metadata buffered
# to read it, return "unknown"
if self._vds_offset == 0:
return 0
if self._vds_offset + 8 > len(self._metadata_buffer):
return 0
self._log.debug('Meta buffer is %i' % len(self._metadata_buffer))
self._log.debug('Offset of VDS is %i->%i' % (self._vds_offset,
self._vds_offset+8))
size, = struct.unpack('Q', self._metadata_buffer[self._vds_offset:
self._vds_offset + 8])
return size
def __str__(self):
return 'vhdx'
@property
def context_info(self):
return {'headerbuffer': len(self._header_buffer),
'metabuffer': len(self._metadata_buffer)}
class VMDKInspector(FileInspector):
def __init__(self):
super(VMDKInspector, self).__init__()
self._header_buffer = b''
self._header_needed_length = 512
def eat_chunk(self, chunk):
super(VMDKInspector, self).eat_chunk(chunk)
if len(self._header_buffer) < self._header_needed_length:
self._header_buffer += chunk
@property
def format_match(self):
return self._header_buffer.startswith(b'KDMV')
@property
def virtual_size(self):
if len(self._header_buffer) < 44:
# Not enough data yet
return 0
sig, ver, flags, sectors, grain, desc_sec, desc_num = struct.unpack(
'<IIIQQQQ', self._header_buffer[:44])
if sig != 0x564d444b:
self._log.warning('Signature KDMV not found: %x' % sig)
return 0
if ver not in (1, 2, 3):
self._log.warning('Unsupported version %i' % ver)
return 0
descriptor_start = desc_sec * 512
descriptor_end = descriptor_start + (desc_num * 512)
if (self._header_needed_length == 512 or
len(self._header_buffer) < self._header_needed_length):
self._header_needed_length = descriptor_end
return 0
descriptor = self._header_buffer[descriptor_start:descriptor_end]
type_idx = descriptor.index('createType="') + len('createType="')
type_end = descriptor.find('"', type_idx)
vmdktype = descriptor[type_idx:type_end]
if vmdktype != 'monolithicSparse':
self._log.warning('Unsupported VMDK format %s' % vmdktype)
return 0
return sectors * 512
@property
def context_info(self):
return {'header': len(self._header_buffer)}
def __str__(self):
return 'vmdk'
class VDIInspector(FileInspector):
def __init__(self):
super(VDIInspector, self).__init__()
self._header_buffer = b''
def eat_chunk(self, chunk):
super(VDIInspector, self).eat_chunk(chunk)
if len(self._header_buffer) < 512:
self._header_buffer += chunk
@property
def format_match(self):
if len(self._header_buffer) < 0x40:
return False
signature, = struct.unpack('<I', self._header_buffer[0x40:0x44])
return signature == 0xbeda107f
@property
def virtual_size(self):
if len(self._header_buffer) < 0x200:
return 0
size, sector = struct.unpack('<QI', self._header_buffer[0x170:0x17C])
self._log.debug('Size: %i (%x) Sector %i' % (size, size, sector))
return size
def __str__(self):
return 'vdi'
@property
def context_info(self):
return {'header': len(self._header_buffer)}
class InfoWrapper(object):
def __init__(self, source, fmt):
self._source = source
self._format = fmt
self._error = False
def read(self, size):
chunk = self._source.read(size)
if not self._error:
try:
self._format.eat_chunk(chunk)
except Exception as e:
# Absolutely do not allow the format inspector to break
# our streaming of the image. If we failed, just stop
# trying, log and keep going.
LOG.error('Format inspector failed, aborting: %s' % e)
self._error = True
return chunk
def get_size_from_qemu_img(filename):
output = subprocess.check_output('qemu-img info %s' % filename,
shell=True)
for line in output.split('\n'):
m = re.search('^virtual size: .* .([0-9]+) bytes', line.strip())
if m:
return int(m.group(1))
print(output)
raise Exception('Could not find virtual size with qemu-img')
if __name__ == '__main__':
import argparse
formats = {
'raw': FileInspector,
'qcow': QcowInspector,
'vhd': VHDInspector,
'vhdx': VHDXInspector,
'vmdk': VMDKInspector,
'vdi': VDIInspector,
}
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--debug', action='store_true')
parser.add_argument('-f', '--format', default='raw',
help='Format (%s)' % ','.join(sorted(formats.keys())))
parser.add_argument('-b', '--block-size', default=65536, type=int,
help='Block read size')
parser.add_argument('--context-limit', default=(1 * 1024), type=int,
help='Maximum memory footprint (KiB)')
parser.add_argument('-i', '--input', default=None,
help='Input file. Defaults to stdin')
parser.add_argument('-v', '--verify', action='store_true',
help=('Verify our number with qemu-img '
'(requires --input)'))
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
fmt = formats[args.format]()
if args.input:
input_stream = open(args.input, 'rb')
else:
input_stream = sys.stdin
stream = InfoWrapper(input_stream, fmt)
count = 0
found_size = False
while True:
chunk = stream.read(int(args.block_size))
#sys.stdout.write(chunk)
if not chunk:
break
count += len(chunk)
if args.format != 'raw' and not found_size and fmt.virtual_size != 0:
# Print the point at which we've seen enough of the file to
# know what the virtual size is. This is almost always less
# than the raw_size
print('Determined virtual size at byte %i' % count)
found_size = True
if fmt.format_match:
print('Source was %s file, virtual size %i MiB (%i bytes)' % (
fmt, fmt.virtual_size / Mi, fmt.virtual_size))
else:
print('*** Format inspector did not detect file as %s' % args.format)
print('Raw size %i MiB (%i bytes)' % (fmt.actual_size / Mi,
fmt.actual_size))
print('Required context: %s' % str(fmt.context_info))
# To make sure we're not storing the whole image, complain if the
# format inspector stored more than context_limit data
if sum(fmt.context_info.values()) > args.context_limit * 1024:
print('*** ERROR: Memory footprint exceeded!')
if args.verify and args.input:
size = get_size_from_qemu_img(args.input)
if size != fmt.virtual_size:
print('*** QEMU disagrees with our size of %i: %i' % (
fmt.virtual_size, size))
else:
print('Confirmed size with qemu-img')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment