Last active
November 12, 2016 05:25
-
-
Save joachimmetz/58962c679370518d54065f0f4dad685a to your computer and use it in GitHub Desktop.
vshadow-store-read.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
# | |
# VSS read test script | |
# | |
# Copyright (c) 2016, Joachim Metz <joachim.metz@gmail.com> | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from __future__ import print_function | |
import argparse | |
import logging | |
import os | |
import sys | |
# from dfvfs.lib import definitions as dfvfs_definitions | |
# from dfvfs.path import factory | |
# from dfvfs.resolver import resolver | |
import pyvshadow | |
class DataRangeFileObject(object): | |
def __init__(self, range_offset, range_size): | |
super(DataRangeFileObject, self).__init__() | |
self._current_offset = 0 | |
self._file_object = None | |
self._range_offset = range_offset | |
self._range_size = range_size | |
self._range_end = range_offset + range_size | |
def close(self): | |
if not self._file_object: | |
raise IOError(u'Not opened') | |
self._file_object.close() | |
self._file_object = None | |
def open(self, path, access_mode='rb'): | |
if self._file_object: | |
raise IOError(u'Already opened') | |
self._current_offset = 0 | |
self._file_object = open(path, access_mode) | |
def read(self, size=0): | |
if not self._file_object: | |
raise IOError(u'Not opened') | |
if self._current_offset >= self._range_size: | |
return b'' | |
if not size: | |
size = self._range_size | |
if (self._current_offset + size) > self._range_size: | |
size = self._range_size - self._current_offset | |
data = self._file_object.read(size) | |
self._current_offset += len(data) | |
return data | |
def seek(self, offset, whence=os.SEEK_SET): | |
if not self._file_object: | |
raise IOError(u'Not opened') | |
if whence == os.SEEK_CUR: | |
offset += self._current_offset | |
elif whence == os.SEEK_END: | |
offset += self._range_end | |
elif whence != os.SEEK_SET: | |
raise IOError(u'Unsupported whence') | |
file_offset = self._range_offset + offset | |
if file_offset < self._range_offset: | |
raise IOError(u'File offset before range') | |
if file_offset < self._range_end: | |
self._file_object.seek(file_offset, os.SEEK_SET) | |
self._current_offset = offset | |
def get_offset(self): | |
if not self._file_object: | |
raise IOError(u'Not opened') | |
return self._current_offset | |
def tell(self): | |
if not self._file_object: | |
raise IOError(u'Not opened') | |
return self._current_offset | |
def get_size(self): | |
return self._range_size | |
def Main(): | |
"""The main program function. | |
Returns: | |
bool: True if successful or False if not. | |
""" | |
argument_parser = argparse.ArgumentParser( | |
description=u'VSS read test.') | |
argument_parser.add_argument( | |
u'-o', u'--offset', dest=u'offset', action=u'store', type=int, | |
default=0, help=u'offset.') | |
argument_parser.add_argument( | |
u'-s', u'--size', dest=u'size', action=u'store', type=int, | |
default=0, help=u'size.') | |
argument_parser.add_argument( | |
u'source', nargs=u'?', action=u'store', metavar=u'PATH', type=str, | |
default=None, help=u'source.') | |
options = argument_parser.parse_args() | |
if not options.source: | |
print(u'Source value is missing.') | |
print(u'') | |
argument_parser.print_help() | |
print(u'') | |
return False | |
logging.basicConfig( | |
level=logging.INFO, format=u'[%(levelname)s] %(message)s') | |
# os_path_spec = factory.Factory.NewPathSpec( | |
# dfvfs_definitions.TYPE_INDICATOR_OS, location=options.source) | |
# data_range_path_spec = factory.Factory.NewPathSpec( | |
# dfvfs_definitions.TYPE_INDICATOR_DATA_RANGE, range_offset=options.offset, | |
# range_size=options.size, parent=os_path_spec) | |
# | |
# file_object = resolver.Resolver.OpenFileObject(data_range_path_spec) | |
file_object = DataRangeFileObject(options.offset, options.size) | |
file_object.open(options.source) | |
try: | |
vshadow_volume = pyvshadow.volume() | |
vshadow_volume.open_file_object(file_object) | |
for store_index in range(vshadow_volume.number_of_stores, 0, -1): | |
vshadow_store = vshadow_volume.get_store(store_index - 1) | |
if not vshadow_store.has_in_volume_data(): | |
logging.info('Skipping store {0:d} out of {1:d}'.format( | |
store_index, vshadow_volume.number_of_stores)) | |
continue | |
logging.info('Reading store {0:d} out of {1:d}'.format( | |
store_index, vshadow_volume.number_of_stores)) | |
file_offset = 0 | |
vshadow_store.seek_offset(file_offset, os.SEEK_SET) | |
while file_offset < vshadow_store.volume_size: | |
buffer = vshadow_store.read_buffer(8000) | |
file_offset += len(buffer) | |
finally: | |
file_object.close() | |
return True | |
if __name__ == '__main__': | |
if not Main(): | |
sys.exit(1) | |
else: | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment