Skip to content

Instantly share code, notes, and snippets.

@joachimmetz
Last active November 12, 2016 05:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joachimmetz/58962c679370518d54065f0f4dad685a to your computer and use it in GitHub Desktop.
Save joachimmetz/58962c679370518d54065f0f4dad685a to your computer and use it in GitHub Desktop.
vshadow-store-read.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# VSS read test script
#
# Copyright (c) 2016, Joachim Metz <joachim.metz@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import argparse
import logging
import os
import sys
# from dfvfs.lib import definitions as dfvfs_definitions
# from dfvfs.path import factory
# from dfvfs.resolver import resolver
import pyvshadow
class DataRangeFileObject(object):
def __init__(self, range_offset, range_size):
super(DataRangeFileObject, self).__init__()
self._current_offset = 0
self._file_object = None
self._range_offset = range_offset
self._range_size = range_size
self._range_end = range_offset + range_size
def close(self):
if not self._file_object:
raise IOError(u'Not opened')
self._file_object.close()
self._file_object = None
def open(self, path, access_mode='rb'):
if self._file_object:
raise IOError(u'Already opened')
self._current_offset = 0
self._file_object = open(path, access_mode)
def read(self, size=0):
if not self._file_object:
raise IOError(u'Not opened')
if self._current_offset >= self._range_size:
return b''
if not size:
size = self._range_size
if (self._current_offset + size) > self._range_size:
size = self._range_size - self._current_offset
data = self._file_object.read(size)
self._current_offset += len(data)
return data
def seek(self, offset, whence=os.SEEK_SET):
if not self._file_object:
raise IOError(u'Not opened')
if whence == os.SEEK_CUR:
offset += self._current_offset
elif whence == os.SEEK_END:
offset += self._range_end
elif whence != os.SEEK_SET:
raise IOError(u'Unsupported whence')
file_offset = self._range_offset + offset
if file_offset < self._range_offset:
raise IOError(u'File offset before range')
if file_offset < self._range_end:
self._file_object.seek(file_offset, os.SEEK_SET)
self._current_offset = offset
def get_offset(self):
if not self._file_object:
raise IOError(u'Not opened')
return self._current_offset
def tell(self):
if not self._file_object:
raise IOError(u'Not opened')
return self._current_offset
def get_size(self):
return self._range_size
def Main():
"""The main program function.
Returns:
bool: True if successful or False if not.
"""
argument_parser = argparse.ArgumentParser(
description=u'VSS read test.')
argument_parser.add_argument(
u'-o', u'--offset', dest=u'offset', action=u'store', type=int,
default=0, help=u'offset.')
argument_parser.add_argument(
u'-s', u'--size', dest=u'size', action=u'store', type=int,
default=0, help=u'size.')
argument_parser.add_argument(
u'source', nargs=u'?', action=u'store', metavar=u'PATH', type=str,
default=None, help=u'source.')
options = argument_parser.parse_args()
if not options.source:
print(u'Source value is missing.')
print(u'')
argument_parser.print_help()
print(u'')
return False
logging.basicConfig(
level=logging.INFO, format=u'[%(levelname)s] %(message)s')
# os_path_spec = factory.Factory.NewPathSpec(
# dfvfs_definitions.TYPE_INDICATOR_OS, location=options.source)
# data_range_path_spec = factory.Factory.NewPathSpec(
# dfvfs_definitions.TYPE_INDICATOR_DATA_RANGE, range_offset=options.offset,
# range_size=options.size, parent=os_path_spec)
#
# file_object = resolver.Resolver.OpenFileObject(data_range_path_spec)
file_object = DataRangeFileObject(options.offset, options.size)
file_object.open(options.source)
try:
vshadow_volume = pyvshadow.volume()
vshadow_volume.open_file_object(file_object)
for store_index in range(vshadow_volume.number_of_stores, 0, -1):
vshadow_store = vshadow_volume.get_store(store_index - 1)
if not vshadow_store.has_in_volume_data():
logging.info('Skipping store {0:d} out of {1:d}'.format(
store_index, vshadow_volume.number_of_stores))
continue
logging.info('Reading store {0:d} out of {1:d}'.format(
store_index, vshadow_volume.number_of_stores))
file_offset = 0
vshadow_store.seek_offset(file_offset, os.SEEK_SET)
while file_offset < vshadow_store.volume_size:
buffer = vshadow_store.read_buffer(8000)
file_offset += len(buffer)
finally:
file_object.close()
return True
if __name__ == '__main__':
if not Main():
sys.exit(1)
else:
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment