Last active
July 30, 2020 22:54
-
-
Save kk7ds/8290e29bcde4ecb1cfb28b3327cdfabd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import re | |
import struct | |
import subprocess | |
import sys | |
Mi = 1024 * 1024 | |
LOG = logging.getLogger(__name__) | |
class FileInspector(object): | |
"""A stream-based disk image inspector. | |
This base class works on raw images and is subclassed for more | |
complex types. It is to be presented with the file to be examined | |
one chunk at a time, during read processing and will only store | |
as much data as necessary to determine required attributes of | |
the file. | |
""" | |
def __init__(self): | |
self._total_count = 0 | |
self._log = logging.getLogger(str(self)) | |
def eat_chunk(self, chunk): | |
"""Call this to present chunks of the file to the inspector.""" | |
self._total_count += len(chunk) | |
@property | |
def format_match(self): | |
"""Returns True if the file appears to be the expected format.""" | |
return True | |
@property | |
def virtual_size(self): | |
"""Returns the virtual size of the disk image, or zero if unknown.""" | |
return self._total_count | |
@property | |
def actual_size(self): | |
"""Returns the total size of the file, usually smaller than | |
virtual_size.""" | |
return self._total_count | |
def __str__(self): | |
"""The string name of this file format.""" | |
return 'raw' | |
@property | |
def context_info(self): | |
"""Return info on amount of data held in memory for auditing. | |
This is a dict of region:sizeinbytes items that the inspector | |
uses to examine the file. | |
""" | |
return {} | |
class QcowInspector(FileInspector): | |
def __init__(self): | |
super(QcowInspector, self).__init__() | |
self._header_buffer = b'' | |
def eat_chunk(self, chunk): | |
super(QcowInspector, self).eat_chunk(chunk) | |
if len(self._header_buffer) < 32: | |
self._header_buffer += chunk | |
def _qcow_header_data(self): | |
magic, version, bf_offset, bf_sz, cluster_bits, size = ( | |
struct.unpack('>IIQIIQ', self._header_buffer[:32])) | |
return magic, size | |
@property | |
def virtual_size(self): | |
if len(self._header_buffer) < 32: | |
return 0 | |
magic, size = self._qcow_header_data() | |
return size | |
@property | |
def format_match(self): | |
if len(self._header_buffer) < 32: | |
return False | |
magic, size = self._qcow_header_data() | |
return magic == 0x514649FB | |
def __str__(self): | |
return 'qcow2' | |
@property | |
def context_info(self): | |
return {'qcowheader': len(self._header_buffer)} | |
class VHDInspector(FileInspector): | |
def __init__(self): | |
super(VHDInspector, self).__init__() | |
self._header_buffer = b'' | |
def eat_chunk(self, chunk): | |
super(VHDInspector, self).eat_chunk(chunk) | |
if len(self._header_buffer) < 512: | |
self._header_buffer += chunk[:512] | |
@property | |
def format_match(self): | |
return self._header_buffer.startswith(b'conectix') | |
@property | |
def virtual_size(self): | |
if len(self._header_buffer) < 512: | |
return 0 | |
return struct.unpack('>Q', self._header_buffer[40:48])[0] | |
def __str__(self): | |
return 'vhd' | |
@property | |
def context_info(self): | |
return {'header': len(self._header_buffer)} | |
class VHDXInspector(FileInspector): | |
METAREGION = '8B7CA206-4790-4B9A-B8FE-575F050F886E' | |
VIRTUAL_DISK_SIZE= '2FA54224-CD1B-4876-B211-5DBED83BF4B8' | |
def __init__(self): | |
super(VHDXInspector, self).__init__() | |
self._header_buffer = b'' | |
self._metadata_buffer = b'' | |
self._metadata_start = None | |
self._metadata_end = None | |
self._vds_offset = 0 | |
def _collect_metadata(self, chunk): | |
pass | |
def _in_meta_region(self, chunk_len): | |
read_start = self._total_count - chunk_len | |
return (self._metadata_start and | |
self._total_count > self._metadata_start and | |
(not self._metadata_end or | |
(read_start < self._metadata_end))) | |
def eat_chunk(self, chunk): | |
super(VHDXInspector, self).eat_chunk(chunk) | |
if len(self._header_buffer) < (256 * 1024): | |
# We need to store at least 320k for the image header | |
self._header_buffer += chunk | |
elif self._metadata_start is None: | |
# We must have enough data to figure out the metadata location | |
self._metadata_start = self._find_meta_region() | |
self._log.debug('Found metadata region offset at %x' % ( | |
self._metadata_start)) | |
if self._in_meta_region(len(chunk)): | |
# We're reading across the metadata region, so save this | |
# in our buffer. Be accurate on the start, but not so | |
# much on the end. We will start reading it before we | |
# know how much we need | |
self._log.debug('In meta at %x' % self._total_count) | |
read_start = self._total_count - len(chunk) | |
if self._metadata_start > read_start: | |
# If this chunk includes the start of the region, | |
# skip the leading part so we start aligned | |
lead_gap = self._metadata_start - read_start | |
else: | |
lead_gap = 0 | |
self._metadata_buffer += chunk[lead_gap:] | |
offset, length = self._find_meta_entry_offset_length( | |
self.VIRTUAL_DISK_SIZE) | |
if offset: | |
# We've read enough of the records to have found the | |
# virtual disk size metadata item. Record its offset | |
# and set our marker for when we can stop capturing | |
# metadata | |
self._vds_offset = offset | |
self._log.debug('Found end of needed metadata: %x' % ( | |
offset + length)) | |
self._metadata_end = self._metadata_start + offset + length | |
@property | |
def format_match(self): | |
return self._header_buffer.startswith(b'vhdxfile') | |
@staticmethod | |
def _guid(buf): | |
"""Format a MSFT GUID from the 16-byte input buffer.""" | |
guid_format = 'IHHBBBBBBBB' | |
return '%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X' % ( | |
struct.unpack(guid_format, buf)) | |
def _find_meta_region(self): | |
# The region table starts at offset 192k in the header/file | |
region_table_offset = 192 * 1024 | |
# The region table entries start after a 16-byte table header | |
region_entry_first = region_table_offset + 16 | |
# Parse the region table header to find the number of regions | |
regi, cksum, count, reserved = struct.unpack( | |
'<IIII', | |
self._header_buffer[region_table_offset:region_table_offset + 16]) | |
if regi != 0x69676572: | |
self._log.warning('Region signature not found at %x' % ( | |
region_table_offset)) | |
return 0 | |
# Process the regions until we find the metadata one; grab the | |
# offset and return | |
self._log.debug('Region entry first is %x' % region_entry_first) | |
self._log.debug('Region entries %i' % count) | |
meta_offset = 0 | |
for i in range(0, count): | |
entry_start = region_entry_first + (i * 32) | |
entry_end = entry_start + 32 | |
entry = self._header_buffer[entry_start:entry_end] | |
self._log.debug('Entry offset is %x' % entry_start) | |
# GUID is the first 16 bytes | |
guid = self._guid(entry[:16]) | |
if guid == self.METAREGION: | |
# This entry is the metadata region entry | |
meta_offset, meta_len, meta_req = struct.unpack( | |
'QII', entry[16:]) | |
self._log.debug('Meta entry %i specifies offset: %x' % ( | |
i, meta_offset)) | |
return meta_offset | |
self._log.warning('Did not find metadata region') | |
return None | |
def _find_meta_entry_offset_length(self, desired_guid): | |
if len(self._metadata_buffer) < 32: | |
# Not enough data yet for full header | |
return 0, 0 | |
# Make sure we found the metadata region by checking the signature | |
sig, reserved, count = struct.unpack('8sHH', | |
self._metadata_buffer[:12]) | |
if sig != 'metadata': | |
self._log.warning('Invalid signature for metadata region: %r' % ( | |
sig)) | |
return 0, 0 | |
entries_size = 32 + (count * 32) | |
if len(self._metadata_buffer) < entries_size: | |
# Not enough data yet for all metadata entries. This is not | |
# strictly necessary as we could process whatever we have until | |
# we find the V-D-S one, but the actual size payload will come | |
# after the last entry, so there is no point in doing the work | |
# until we find all of them. | |
return 0, 0 | |
for i in range(0, count): | |
entry_offset = 32 + (i * 32) | |
guid = self._guid(self._metadata_buffer[entry_offset: | |
entry_offset + 16]) | |
if guid == desired_guid: | |
# Found the item we are looking for by id. | |
# Return the size and offset | |
item_offset, item_length, foo, bar = struct.unpack( | |
'IIII', | |
self._metadata_buffer[entry_offset + 16:entry_offset + 32]) | |
return item_offset, item_length | |
self._log.warning('Did not find guid %s' % desired_guid) | |
return 0, 0 | |
@property | |
def virtual_size(self): | |
# Until we have found the offset and have enough metadata buffered | |
# to read it, return "unknown" | |
if self._vds_offset == 0: | |
return 0 | |
if self._vds_offset + 8 > len(self._metadata_buffer): | |
return 0 | |
self._log.debug('Meta buffer is %i' % len(self._metadata_buffer)) | |
self._log.debug('Offset of VDS is %i->%i' % (self._vds_offset, | |
self._vds_offset+8)) | |
size, = struct.unpack('Q', self._metadata_buffer[self._vds_offset: | |
self._vds_offset + 8]) | |
return size | |
def __str__(self): | |
return 'vhdx' | |
@property | |
def context_info(self): | |
return {'headerbuffer': len(self._header_buffer), | |
'metabuffer': len(self._metadata_buffer)} | |
class VMDKInspector(FileInspector): | |
def __init__(self): | |
super(VMDKInspector, self).__init__() | |
self._header_buffer = b'' | |
self._header_needed_length = 512 | |
def eat_chunk(self, chunk): | |
super(VMDKInspector, self).eat_chunk(chunk) | |
if len(self._header_buffer) < self._header_needed_length: | |
self._header_buffer += chunk | |
@property | |
def format_match(self): | |
return self._header_buffer.startswith(b'KDMV') | |
@property | |
def virtual_size(self): | |
if len(self._header_buffer) < 44: | |
# Not enough data yet | |
return 0 | |
sig, ver, flags, sectors, grain, desc_sec, desc_num = struct.unpack( | |
'<IIIQQQQ', self._header_buffer[:44]) | |
if sig != 0x564d444b: | |
self._log.warning('Signature KDMV not found: %x' % sig) | |
return 0 | |
if ver not in (1, 2, 3): | |
self._log.warning('Unsupported version %i' % ver) | |
return 0 | |
descriptor_start = desc_sec * 512 | |
descriptor_end = descriptor_start + (desc_num * 512) | |
if (self._header_needed_length == 512 or | |
len(self._header_buffer) < self._header_needed_length): | |
self._header_needed_length = descriptor_end | |
return 0 | |
descriptor = self._header_buffer[descriptor_start:descriptor_end] | |
type_idx = descriptor.index('createType="') + len('createType="') | |
type_end = descriptor.find('"', type_idx) | |
vmdktype = descriptor[type_idx:type_end] | |
if vmdktype != 'monolithicSparse': | |
self._log.warning('Unsupported VMDK format %s' % vmdktype) | |
return 0 | |
return sectors * 512 | |
@property | |
def context_info(self): | |
return {'header': len(self._header_buffer)} | |
def __str__(self): | |
return 'vmdk' | |
class VDIInspector(FileInspector): | |
def __init__(self): | |
super(VDIInspector, self).__init__() | |
self._header_buffer = b'' | |
def eat_chunk(self, chunk): | |
super(VDIInspector, self).eat_chunk(chunk) | |
if len(self._header_buffer) < 512: | |
self._header_buffer += chunk | |
@property | |
def format_match(self): | |
if len(self._header_buffer) < 0x40: | |
return False | |
signature, = struct.unpack('<I', self._header_buffer[0x40:0x44]) | |
return signature == 0xbeda107f | |
@property | |
def virtual_size(self): | |
if len(self._header_buffer) < 0x200: | |
return 0 | |
size, sector = struct.unpack('<QI', self._header_buffer[0x170:0x17C]) | |
self._log.debug('Size: %i (%x) Sector %i' % (size, size, sector)) | |
return size | |
def __str__(self): | |
return 'vdi' | |
@property | |
def context_info(self): | |
return {'header': len(self._header_buffer)} | |
class InfoWrapper(object): | |
def __init__(self, source, fmt): | |
self._source = source | |
self._format = fmt | |
self._error = False | |
def read(self, size): | |
chunk = self._source.read(size) | |
if not self._error: | |
try: | |
self._format.eat_chunk(chunk) | |
except Exception as e: | |
# Absolutely do not allow the format inspector to break | |
# our streaming of the image. If we failed, just stop | |
# trying, log and keep going. | |
LOG.error('Format inspector failed, aborting: %s' % e) | |
self._error = True | |
return chunk | |
def get_size_from_qemu_img(filename): | |
output = subprocess.check_output('qemu-img info %s' % filename, | |
shell=True) | |
for line in output.split('\n'): | |
m = re.search('^virtual size: .* .([0-9]+) bytes', line.strip()) | |
if m: | |
return int(m.group(1)) | |
print(output) | |
raise Exception('Could not find virtual size with qemu-img') | |
if __name__ == '__main__': | |
import argparse | |
formats = { | |
'raw': FileInspector, | |
'qcow': QcowInspector, | |
'vhd': VHDInspector, | |
'vhdx': VHDXInspector, | |
'vmdk': VMDKInspector, | |
'vdi': VDIInspector, | |
} | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-d', '--debug', action='store_true') | |
parser.add_argument('-f', '--format', default='raw', | |
help='Format (%s)' % ','.join(sorted(formats.keys()))) | |
parser.add_argument('-b', '--block-size', default=65536, type=int, | |
help='Block read size') | |
parser.add_argument('--context-limit', default=(1 * 1024), type=int, | |
help='Maximum memory footprint (KiB)') | |
parser.add_argument('-i', '--input', default=None, | |
help='Input file. Defaults to stdin') | |
parser.add_argument('-v', '--verify', action='store_true', | |
help=('Verify our number with qemu-img ' | |
'(requires --input)')) | |
args = parser.parse_args() | |
if args.debug: | |
logging.basicConfig(level=logging.DEBUG) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
fmt = formats[args.format]() | |
if args.input: | |
input_stream = open(args.input, 'rb') | |
else: | |
input_stream = sys.stdin | |
stream = InfoWrapper(input_stream, fmt) | |
count = 0 | |
found_size = False | |
while True: | |
chunk = stream.read(int(args.block_size)) | |
#sys.stdout.write(chunk) | |
if not chunk: | |
break | |
count += len(chunk) | |
if args.format != 'raw' and not found_size and fmt.virtual_size != 0: | |
# Print the point at which we've seen enough of the file to | |
# know what the virtual size is. This is almost always less | |
# than the raw_size | |
print('Determined virtual size at byte %i' % count) | |
found_size = True | |
if fmt.format_match: | |
print('Source was %s file, virtual size %i MiB (%i bytes)' % ( | |
fmt, fmt.virtual_size / Mi, fmt.virtual_size)) | |
else: | |
print('*** Format inspector did not detect file as %s' % args.format) | |
print('Raw size %i MiB (%i bytes)' % (fmt.actual_size / Mi, | |
fmt.actual_size)) | |
print('Required context: %s' % str(fmt.context_info)) | |
# To make sure we're not storing the whole image, complain if the | |
# format inspector stored more than context_limit data | |
if sum(fmt.context_info.values()) > args.context_limit * 1024: | |
print('*** ERROR: Memory footprint exceeded!') | |
if args.verify and args.input: | |
size = get_size_from_qemu_img(args.input) | |
if size != fmt.virtual_size: | |
print('*** QEMU disagrees with our size of %i: %i' % ( | |
fmt.virtual_size, size)) | |
else: | |
print('Confirmed size with qemu-img') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment