Skip to content

Instantly share code, notes, and snippets.

@icedraco
Last active July 18, 2020 21:31
Show Gist options
  • Save icedraco/2c7622618a6874f00559aa79352b35ee to your computer and use it in GitHub Desktop.
Save icedraco/2c7622618a6874f00559aa79352b35ee to your computer and use it in GitHub Desktop.
File Split & Merge Script
"""
IceDragon's Split & Merge (IDSM) Script
"""
from __future__ import annotations
import os
import sys
import struct
import hashlib
from typing import Iterator, BinaryIO, TextIO
KB = 1024
MB = 1024 * KB
GB = 1024 * MB
BUFFER_SIZE = 8 * KB
DEFAULT_CHUNK_SIZE = 20 * MB
HASH_SIZE = hashlib.md5().digest_size # bytes
ZERO_HASH = b'\x00' * HASH_SIZE
# --- Classes --------------------------------------------------------------- #
class ChunkHeader:
"""
Represents a header for every chunk file
"""
FMT_INDEX = '<H'
MAGIC_NUMBER = b'ID'
FORMAT_VERSION = b'01'
INDEX_SIZE = struct.calcsize(FMT_INDEX) # unsigned short
HEADER_SIZE = len(MAGIC_NUMBER) + len(FORMAT_VERSION) + HASH_SIZE + INDEX_SIZE
@classmethod
def read_from(cls, fd: BinaryIO):
magic = fd.read(2)
if magic != cls.MAGIC_NUMBER:
raise Exception(f'invalid magic number: {repr(magic)}')
version = fd.read(2)
if version != cls.FORMAT_VERSION:
raise Exception(f'unsupported format version: {repr(version)}')
index: int = struct.unpack(cls.FMT_INDEX, fd.read(cls.INDEX_SIZE))[0]
chunk_hash = fd.read(HASH_SIZE)
if len(chunk_hash) != HASH_SIZE:
raise Exception(f'invalid hash size ({len(chunk_hash)} != {HASH_SIZE})')
return cls(index, chunk_hash)
def __init__(self, index: int, chunk_hash: bytes = ZERO_HASH):
assert 0 <= index <= 0xFFFF, repr(index)
self.chunk_index = index
self.chunk_hash = chunk_hash
def __bytes__(self) -> bytes:
return self.serialize()
def write(self, fd: BinaryIO):
fd.write(self.serialize())
def serialize(self) -> bytes:
index_bytes = struct.pack(self.FMT_INDEX, self.chunk_index)
header = self.MAGIC_NUMBER + self.FORMAT_VERSION + index_bytes + self.chunk_hash
assert len(header) == self.HEADER_SIZE
return header
class Manifest:
REQUIRED_KEYS = {'src_filename', 'file_size', 'file_hash', 'num_chunks'}
@classmethod
def load(cls, filename: str) -> Manifest:
items = {}
cum_hashes = []
with open(filename, 'r') as f:
flag_read_hashes = False
for line in (line.strip() for line in f):
if not line or line.startswith('#'):
continue
elif line.startswith('->'):
flag_read_hashes = True
continue
elif line.startswith('<-'):
flag_read_hashes = False
continue
if flag_read_hashes:
cum_hashes.append(line)
else:
assert '=' in line, repr(line)
key, value = line.split('=', 1)
items[key] = value
missing_keys = cls.REQUIRED_KEYS - items.keys()
if missing_keys:
keys_str = ', '.join(missing_keys)
raise AssertionError(f'missing manifest keys: {keys_str}')
manifest = Manifest(
items['src_filename'],
int(items['file_size']),
int(items['num_chunks']),
items['file_hash'])
for c_hash in cum_hashes:
manifest.add_cumulative_hash(c_hash)
return manifest
def __init__(self, filename: str, file_size: int, num_chunks: int = None, file_hash: str = None):
self.filename = filename
self.file_size = file_size
self.file_hash = file_hash
self.num_chunks = num_chunks
self.cumulative_hashes = []
def __len__(self) -> int:
return self.num_chunks
def has_all_cumulative_hashes(self) -> bool:
return len(self.cumulative_hashes) >= self.num_chunks
def add_cumulative_hash(self, h: str):
if len(h) != HASH_SIZE * 2:
ValueError(f"invalid hash: {h} (hash index: {len(self.cumulative_hashes)})")
self.cumulative_hashes.append(h)
def write(self, f: TextIO):
buffer = [
'# IDSM Splitter Manifest',
f'src_filename={self.filename}',
f'file_size={self.file_size}',
f'file_hash={self.file_hash}',
f'num_chunks={self.num_chunks}',
'',
'-> cumulative hashes',
*self.cumulative_hashes,
'<- cumulative hashes',
]
f.write('\n'.join(buffer + ['']))
# --- MAIN ------------------------------------------------------------------ #
def main() -> int:
argv = sys.argv
print('--- IDSM 1.0 ------------------------------------')
if len(argv) < 2:
return print_usage()
mode = argv[1]
if mode.startswith('s'): # split
if len(argv) >= 4:
src_file = os.path.abspath(argv[2])
_, src_filename = os.path.split(src_file)
target_dir = os.path.abspath(argv[3])
manifest_file = os.path.join(target_dir, src_filename + '.manifest')
return main_split(src_file, target_dir, manifest_file)
elif mode.startswith('m'): # merge
if len(argv) >= 3:
manifest_file = os.path.abspath(argv[2])
# determine target_dir
if len(argv) >= 4:
target_dir = os.path.abspath(argv[3])
else:
_, target_dir = os.path.split(manifest_file)
return main_merge(manifest_file=argv[2], target_dir=target_dir)
elif mode.startswith('v'): # verify
if len(argv) >= 3:
manifest_file = os.path.abspath(argv[2])
_, target_dir = os.path.split(manifest_file)
return main_verify(manifest_file=argv[2])
# unknown mode
return print_usage()
def main_split(src_file, target_dir, manifest_file, chunk_size=DEFAULT_CHUNK_SIZE) -> int:
print(f'source: {src_file}')
print(f'target dir: {target_dir}')
print(f'manifest: {manifest_file}')
print(f'chunk size: {chunk_size} bytes')
print()
if not os.path.isfile(src_file):
sys.stderr.write('Error: source file does not exist or is not a file\n')
return 2
try:
os.mkdir(target_dir)
except FileExistsError:
pass
except OSError as ex:
sys.stderr.write(f'Cannot create target dir: {ex.strerror}\n')
return 3
try:
return split(src_file, target_dir, manifest_file, chunk_size=chunk_size)
except ValueError as ex:
sys.stderr.write(f'Error: {ex}\n')
return 4
def main_merge(manifest_file: str, target_dir: str = None) -> int:
if not target_dir:
target_dir, _ = os.path.split(os.path.abspath(manifest_file))
print(f'manifest: {manifest_file}')
print(f'target dir: {target_dir}')
print()
if not os.path.isfile(manifest_file):
sys.stderr.write('Error: manifest file not found\n')
return 1
if not os.path.isdir(target_dir):
sys.stderr.write('Error: target directory is missing\n')
return 1
return merge(manifest_file, target_dir)
def main_verify(manifest_file: str, target_dir: str = None) -> int:
if not target_dir:
target_dir, _ = os.path.split(os.path.abspath(manifest_file))
print(f'manifest: {manifest_file}')
print(f'target dir: {target_dir}')
print()
if not os.path.isfile(manifest_file):
sys.stderr.write('Error: manifest file not found\n')
return 1
if not os.path.isdir(target_dir):
sys.stderr.write('Error: target directory is missing\n')
return 1
return verify(manifest_file)
# --- FUNCTIONS ------------------------------------------------------------- #
def print_usage():
sys.stderr.write(f'Syntax: {sys.argv[0]} split <src-file> <target-dir>\n')
sys.stderr.write(f' {sys.argv[0]} merge <manifest-file> [target-dir]\n')
sys.stderr.write(f' {sys.argv[0]} verify <manifest-file>\n')
sys.stderr.write('\n')
return 1
def split(src_file: str, target_dir: str, manifest_file: str, *, chunk_size: int = DEFAULT_CHUNK_SIZE):
src_filename = os.path.basename(src_file)
manifest_filename = os.path.basename(manifest_file)
print(f'--- BEGIN SPLIT: {src_filename} -> {target_dir} [manifest: {manifest_filename}]')
print(' * checking request sanity')
if not os.path.isfile(src_file):
raise ValueError(f'not a file: {src_file}')
effective_chunk_size = chunk_size - ChunkHeader.HEADER_SIZE
if effective_chunk_size <= 0:
raise ValueError(f'chunk size ({chunk_size}) is too low (not enough space for header)')
print(' * preparing...')
file_size = os.path.getsize(src_file)
print(f' > file size: {file_size} bytes')
num_chunks = calculate_num_chunks(file_size, effective_chunk_size)
num_chunk_digits = len(str(num_chunks))
print(f' > # chunks: {num_chunks}')
def chunk_path(n: int) -> str:
x_value = str(n).zfill(num_chunk_digits)
return os.path.join(target_dir, f'{src_filename}.x{x_value}')
def bucket_sizes(total_size: int, bucket_size: int) -> Iterator[int]:
num_full, remaining_size = divmod(total_size, bucket_size)
yield from (bucket_size for _ in range(num_full))
yield remaining_size
manifest = Manifest(src_filename, file_size, num_chunks)
print(' * opening source file...')
with open(src_file, 'rb') as f_input:
h_manifest = hashlib.md5()
print(' = writing chunks:')
for i, current_effective_size in enumerate(bucket_sizes(file_size, effective_chunk_size)):
print(f' > chunk {i + 1} / {num_chunks}')
h_chunk = hashlib.md5()
with open(chunk_path(i + 1), 'wb') as f_output:
header = ChunkHeader(i)
header.write(f_output)
h_chunk.update(bytes(header)) # include header in chunk hash
pos = 0
while pos < current_effective_size:
buffer = f_input.read(min(BUFFER_SIZE, current_effective_size - pos))
if len(buffer) == 0:
raise RuntimeError(f'buffer is empty (pos={pos}, eff_sz={current_effective_size})')
f_output.write(buffer)
h_chunk.update(buffer)
h_manifest.update(buffer)
pos += len(buffer)
# verify chunk written in its entirety
if f_output.tell() != current_effective_size + ChunkHeader.HEADER_SIZE:
p = f_output.tell()
raise RuntimeError(f'unexpected output file position: {p} != {current_effective_size}')
# write chunk header
header.chunk_hash = h_chunk.digest()
f_output.seek(0)
header.write(f_output)
# done writing chunk
manifest.add_cumulative_hash(h_manifest.hexdigest())
print(' ! all chunks written')
print()
print(' * checking input file position')
if f_input.tell() != file_size:
p = f_input.tell()
raise RuntimeError(f'unexpected input file position: {p} != {file_size}')
print(' * writing manifest...')
manifest.file_hash = h_manifest.hexdigest()
with open(manifest_file, 'w') as f_manifest:
manifest.write(f_manifest)
print(' ! manifest written')
print()
print('ALL DONE')
def verify(manifest_file: str) -> int:
target_dir, manifest_filename = os.path.split(os.path.abspath(manifest_file))
print(f'--- VERIFY: {manifest_file}')
print(f' * target dir: {target_dir}')
if not os.path.isfile(manifest_file):
raise ValueError(f'not a file: {manifest_file}')
manifest = Manifest.load(manifest_file)
num_chunks = manifest.num_chunks
num_cumulative_hashes = len(manifest.cumulative_hashes)
chunk_count_len = len(str(num_chunks))
def get_filename(chunk_num: int) -> str:
chunk_str = str(chunk_num).zfill(chunk_count_len)
return os.path.join(target_dir, f'{manifest.filename}.x{chunk_str}')
print(' ~ validating chunks:')
h_full = hashlib.md5()
for i in range(manifest.num_chunks):
issues = []
cum_hash = 'n/a'
chunk_file = get_filename(i + 1)
chunk_filename = os.path.basename(chunk_file)
chunk_file_size = os.path.getsize(chunk_file)
chunk_effective_size = chunk_file_size - ChunkHeader.HEADER_SIZE
try:
if chunk_effective_size <= 0:
issues.append(f'FILE TOO SMALL: sz={chunk_file_size} eff_sz={chunk_effective_size}')
continue
with open(chunk_file, 'rb') as f:
h_chunk = hashlib.md5()
h_chunk_expected_bytes = None
try:
header = ChunkHeader.read_from(f)
except Exception as ex:
issues.append(f'HEADER FAIL: {ex}')
else:
if header.chunk_index != i:
issues.append(f'CHUNK INDEX MISMATCH: idx={header.chunk_index} i={i}')
h_chunk_expected_bytes = header.chunk_hash
header.chunk_hash = ZERO_HASH
h_chunk.update(header.serialize())
# read data itself
f.seek(ChunkHeader.HEADER_SIZE)
pos = f.tell()
while pos < chunk_file_size:
buffer = f.read(min(BUFFER_SIZE, chunk_file_size - pos))
if len(buffer) == 0:
issues.append(f'EMPTY BUFFER: pos={pos} / {chunk_file_size}')
h_full.update(buffer)
h_chunk.update(buffer)
pos += len(buffer)
# check integrity hash for this chunk
if h_chunk_expected_bytes != h_chunk.digest():
exp = repr(h_chunk_expected_bytes)
act = repr(h_chunk.digest())
issues.append(f'FILE INTEGRITY FAILED: expected={exp} actual={act}')
# check cumulative hash for this chunk
cum_hash = manifest.cumulative_hashes[i] if i < num_cumulative_hashes else None
if cum_hash:
if cum_hash != h_full.hexdigest():
issues.append(f'C-HASH MISMATCH: expected={cum_hash} actual={h_full.hexdigest()}')
else:
issues.append(f'NO CUMULATIVE HASH ({num_cumulative_hashes} c-hashes)')
finally:
status = 'FAIL' if issues else ' OK '
print(f' > [{status}] chunk {i + 1} / {num_chunks} ({chunk_filename}) [{cum_hash}]')
for issue in issues:
print(f' * {issue}')
print(' ~ checking file md5:')
print(f' > expected: {manifest.file_hash}')
print(f' > final: {h_full.hexdigest()}')
verdict = 'PASS' if manifest.file_hash == h_full.hexdigest() else 'FAIL'
print(f' > verdict: {verdict}')
return int(verdict == 'PASS')
def merge(manifest_file: str, target_dir: str = None) -> int:
source_dir, manifest_filename = os.path.split(os.path.abspath(manifest_file))
if not target_dir:
target_dir = source_dir
print(f'--- MERGE: {manifest_file}')
print(f' * source dir: {source_dir}')
print(f' * target dir: {target_dir}')
if not os.path.isfile(manifest_file):
raise ValueError(f'not a file: {manifest_file}')
manifest = Manifest.load(manifest_file)
target_file = os.path.join(target_dir, manifest.filename)
print(f' * target file: {target_file}')
print()
num_chunks = manifest.num_chunks
num_cumulative_hashes = len(manifest.cumulative_hashes)
chunk_count_len = len(str(num_chunks))
def get_filename(chunk_num: int) -> str:
chunk_str = str(chunk_num).zfill(chunk_count_len)
return os.path.join(source_dir, f'{manifest.filename}.x{chunk_str}')
print(' ~ merging chunks:')
h_full = hashlib.md5()
with open(target_file, 'wb') as f_master:
for i in range(manifest.num_chunks):
issues = []
cum_hash = 'n/a'
chunk_file = get_filename(i + 1)
chunk_filename = os.path.basename(chunk_file)
chunk_file_size = os.path.getsize(chunk_file)
chunk_effective_size = chunk_file_size - ChunkHeader.HEADER_SIZE
try:
if chunk_effective_size <= 0:
issues.append(f'FILE TOO SMALL: sz={chunk_file_size} eff_sz={chunk_effective_size}')
continue
with open(chunk_file, 'rb') as f:
h_chunk = hashlib.md5()
try:
header = ChunkHeader.read_from(f)
except Exception as ex:
issues.append(f'HEADER FAIL: {ex}')
continue
else:
if header.chunk_index != i:
issues.append(f'CHUNK INDEX MISMATCH: idx={header.chunk_index} i={i}')
continue
h_chunk_expected_bytes = header.chunk_hash
header.chunk_hash = ZERO_HASH
h_chunk.update(header.serialize())
# transfer data
f.seek(ChunkHeader.HEADER_SIZE)
pos = f.tell()
while pos < chunk_file_size:
buffer = f.read(min(BUFFER_SIZE, chunk_file_size - pos))
if len(buffer) == 0:
issues.append(f'EMPTY BUFFER: pos={pos} / {chunk_file_size}')
try:
f_master.write(buffer)
except IOError as ex:
issues.append(f'I/O ERROR: {ex.strerror}')
raise
h_full.update(buffer)
h_chunk.update(buffer)
pos += len(buffer)
# check integrity hash for this chunk
if h_chunk_expected_bytes != h_chunk.digest():
exp = repr(h_chunk_expected_bytes)
act = repr(h_chunk.digest())
issues.append(f'FILE INTEGRITY FAILED: expected={exp} actual={act}')
# check cumulative hash for this chunk
cum_hash = manifest.cumulative_hashes[i] if i < num_cumulative_hashes else None
if cum_hash:
if cum_hash != h_full.hexdigest():
issues.append(f'C-HASH MISMATCH: expected={cum_hash} actual={h_full.hexdigest()}')
else:
issues.append(f'NO CUMULATIVE HASH ({num_cumulative_hashes} c-hashes)')
finally:
status = 'FAIL' if issues else ' OK '
print(f' > [{status}] chunk {i + 1} / {num_chunks} ({chunk_filename}) [{cum_hash}]')
for issue in issues:
print(f' * {issue}')
if issues:
return 2
print(' ~ checking file md5:')
print(f' > expected: {manifest.file_hash}')
print(f' > final: {h_full.hexdigest()}')
verdict = 'PASS' if manifest.file_hash == h_full.hexdigest() else 'FAIL'
print(f' > verdict: {verdict}')
return int(verdict == 'PASS')
def calculate_num_chunks(file_size: int, chunk_size: int) -> int:
full_chunks, remain_bytes = divmod(file_size, chunk_size - ChunkHeader.HEADER_SIZE)
return full_chunks + int(remain_bytes > 0)
if __name__ == '__main__':
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment