|
#!/usr/bin/env python3 |
|
|
|
import os |
|
import sys |
|
import shutil |
|
import threading |
|
import signal |
|
from argparse import ArgumentParser |
|
from pathlib import Path |
|
from hashlib import blake2b |
|
from datetime import datetime |
|
from concurrent.futures import ThreadPoolExecutor |
|
from fnmatch import fnmatch |
|
|
|
|
|
def printerr(*args, **kwargs): |
|
kwargs['file']=sys.stderr |
|
print(*args, **kwargs) |
|
sys.stderr.flush() |
|
|
|
def die(msg): |
|
printerr(f"\033[31mERROR:\033[0m {msg}") |
|
sys.exit(1) |
|
|
|
|
|
EXECUTABLE_NAME = "snapSHAt" |
|
|
|
class Snapshotter(): |
|
BLOB_CACHE_DIR_NAME = f'.{EXECUTABLE_NAME}.d' |
|
_print_lock = threading.Lock() |
|
|
|
def __init__(self, |
|
snapshots_dir: str, |
|
snapshot_name: str, |
|
exclude_paths: list[str], |
|
dry_mode: bool=False, |
|
skip_symlink_targets: bool=False, |
|
verbose: bool=False): |
|
self.snapshot_name = snapshot_name |
|
self.blob_cache_path = Path(f"{snapshots_dir}/{self.BLOB_CACHE_DIR_NAME}").absolute() |
|
self.snapshot_path = Path(f"{snapshots_dir}/{snapshot_name}").absolute() |
|
self.exclude_paths = set(exclude_paths) |
|
self.exclude_paths.add(snapshots_dir) |
|
self.skip_symlink_targets = skip_symlink_targets |
|
self.target_hashes = None |
|
|
|
|
|
if dry_mode: |
|
self.snapshot_file = \ |
|
self.snapshot_dir = \ |
|
self.snapshot_symlink = \ |
|
self.snapshot_other = lambda p, d: print(p) |
|
|
|
if verbose: |
|
self.verbose_print = printerr |
|
else: |
|
self.verbose_print = self.print_transitory |
|
|
|
self._match_terminal_width() |
|
signal.signal(signal.SIGWINCH, self._match_terminal_width) |
|
|
|
|
|
def _match_terminal_width(self, *_): |
|
with __class__._print_lock: |
|
try: |
|
terminal_width = os.get_terminal_size()[0] - 2 |
|
except OSError: |
|
terminal_width = 80 |
|
self._terminal_width = terminal_width |
|
|
|
|
|
|
|
def print_transitory(self, msg=''): |
|
if self._string_length_ascii(msg) > self._terminal_width: |
|
msg = msg[:self._terminal_width-3] + '...' |
|
with __class__._print_lock: |
|
printerr(f"\033[0J{msg}", end='\r') |
|
|
|
|
|
@staticmethod |
|
def get_file_hash(path: Path, prefix_length: int=3): |
|
return blake2b(path.read_bytes()).digest().hex() |
|
|
|
|
|
@staticmethod |
|
def get_nested_hash_path(file_hash: str, dir_delimiter: str='/', prefix_length=3): |
|
return dir_delimiter.join((file_hash[:prefix_length], file_hash[prefix_length:])) |
|
|
|
|
|
@staticmethod |
|
def _string_length_ascii(s: str): |
|
return len([c if ord(c)<128 else ' ' for c in s]) |
|
|
|
|
|
def recursive_path_iter(self, path: Path): |
|
if any(any((path.name == x, path.is_relative_to(x), fnmatch(path, x))) |
|
for x in self.exclude_paths): |
|
self.print_transitory() |
|
print(f"Excluding {path}") |
|
return |
|
if path.is_dir(): |
|
yield path |
|
if not path.is_symlink(): |
|
for p in path.iterdir(): |
|
yield from self.recursive_path_iter(p) |
|
else: |
|
yield path |
|
|
|
|
|
def get_target_hashes(self, targets: [Path]): |
|
target_hashes = {} |
|
|
|
def _get_path_and_hash(file_path): |
|
file_hash = self.get_file_hash(file_path) |
|
self.print_transitory(f"{file_path} => {file_hash}") |
|
return file_path, file_hash |
|
|
|
with ThreadPoolExecutor(max_workers=os.cpu_count()-1) as pool: |
|
all_targets = (p for t in targets for p in self.recursive_path_iter(t)) |
|
target_files = filter(lambda p: p.is_file(), all_targets) |
|
hash_results = pool.map(_get_path_and_hash, target_files) |
|
for file_path, file_hash in hash_results: |
|
target_hashes[file_path] = file_hash |
|
self.print_transitory() |
|
|
|
return target_hashes |
|
|
|
|
|
def snapshot_file(self, file: Path, destination: Path): |
|
file_hash = self.target_hashes.get(file) |
|
if file_hash is None: |
|
return |
|
nested_hash_path = self.get_nested_hash_path(file_hash, dir_delimiter=file.anchor) |
|
|
|
blob_path = self.blob_cache_path / nested_hash_path |
|
blob_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
if blob_path.exists(): |
|
verbose_msg = f"Linked cached {file}" |
|
else: |
|
shutil.copy2(file, blob_path) |
|
verbose_msg = f"Linked {file} to {file_hash}" |
|
|
|
destination.parent.mkdir(exist_ok=True, parents=True) |
|
|
|
if destination.exists(): |
|
if destination.samefile(blob_path): |
|
printerr(f"{file} is already linked to {file_hash} in this snapshot. The most" |
|
" likely cause of this is that a symlink captured in this snapshot points" |
|
" to this path.") |
|
else: |
|
printerr(f"{file} already exists in this snapshot but it does not target the blob" |
|
f" in the cache that matches its current hash {file_hash}. Something" |
|
" unexpected has occured, dropping into and interactive debugger.") |
|
breakpoint() |
|
else: |
|
destination.hardlink_to(blob_path) |
|
self.verbose_print(verbose_msg) |
|
return |
|
|
|
|
|
def snapshot_dir(self, directory: Path, destination: Path): |
|
destination.mkdir(parents=True, exist_ok=True) |
|
return |
|
|
|
|
|
def snapshot_symlink(self, symlink: Path, destination: Path): |
|
if destination.exists(): |
|
return |
|
symlink_destination = symlink.readlink() |
|
if symlink_destination.is_absolute(): |
|
# re-root the absolute symlink's target |
|
snapshot_symlink_destination = Path(f"{self.snapshot_path}/{symlink_destination}") |
|
else: |
|
# relative symlink will work as expected under the snapshot dir, just duplicate it |
|
snapshot_symlink_destination = symlink_destination |
|
symlink_destination = (symlink.parent / symlink_destination).resolve() |
|
|
|
self.verbose_print(f"Symlink: {symlink} => {symlink_destination}") |
|
destination.parent.mkdir(parents=True, exist_ok=True) |
|
destination.symlink_to(snapshot_symlink_destination) |
|
|
|
self.snapshot_by_type(symlink_destination) |
|
return |
|
|
|
|
|
def snapshot_other(self, path: Path, destination: Path): |
|
print(f"Unhandled path type for {path}") |
|
breakpoint() |
|
|
|
|
|
def snapshot_by_type(self, path: Path): |
|
destination = Path(f"{self.snapshot_path}/{path}") |
|
|
|
if path.is_symlink(): |
|
return self.snapshot_symlink(path, destination) |
|
if path.is_dir(): |
|
return self.snapshot_dir(path, destination) |
|
if path.is_file(): |
|
return self.snapshot_file(path, destination) |
|
if path.is_fifo(): |
|
return |
|
if path.is_socket(): |
|
return |
|
else: |
|
return self.snapshot_other(path, destination) |
|
|
|
|
|
def snapshot(self, targets: [Path]): |
|
self.target_hashes = self.get_target_hashes(targets) |
|
self.blob_cache_path.mkdir(parents=True, exist_ok=True) |
|
self.snapshot_path.mkdir(parents=True, exist_ok=False) |
|
self.verbose_print(f"Beginning snapshot {self.snapshot_name}") |
|
for p in self.target_hashes.keys(): |
|
self.snapshot_by_type(p) |
|
|
|
|
|
def get_snapshot_size(self, path: Path): |
|
for p in self.recursive_path_iter(path): |
|
self.snapshot_by_type(p) |
|
# path.stat().st_size / 1_000_000_000 |
|
|
|
|
|
def garbage_collect_blob_cache(self): |
|
blobs = (p for p in self.blob_cache_path.rglob('*') if p.is_file()) |
|
for blob in blobs: |
|
if blob.stat().st_nlink < 2: |
|
self.verbose_print(f"Garbage collecting {blob}") |
|
blob.unlink() |
|
self.print_transitory() |
|
|
|
|
|
|
|
def main(): |
|
parser = ArgumentParser() |
|
|
|
parser.add_argument('--destination', '-o', |
|
required=True, |
|
help="""Path to store filesystem snapshots in. Reuse this destination in |
|
the future to reuse the blob cache and prevent file duplication.""") |
|
|
|
parser.add_argument('--targets', '-i', |
|
nargs='*', |
|
default=[], |
|
help="Path(s) to recursively snapshot") |
|
|
|
parser.add_argument('--exclude', '-x', |
|
nargs='*', |
|
default=[], |
|
help="Path(s) to omit from the snapshot") |
|
|
|
parser.add_argument('--snapshot-name', '-n', |
|
default=datetime.now().strftime('%F %H:%M:%S'), |
|
help=f"""Name to use for this snapshot's directory. Defaults to the current |
|
timestamp with filesystem-naming compatible delimiters.""") |
|
|
|
parser.add_argument('--garbage-collect-cache', '--gc', |
|
dest='garbage_collect', |
|
action="store_true", |
|
help="Run garbage collection in the blob cache") |
|
|
|
|
|
parser.add_argument('--dry', |
|
action="store_true", |
|
help="""Only print what file operations would be performed instead of |
|
actually performing them. Useful as a sanity check.""") |
|
|
|
parser.add_argument('--verbose', '-v', |
|
action="store_true", |
|
help="Output extra information during snapshot operations.") |
|
|
|
args = parser.parse_args() |
|
|
|
snapshotter = Snapshotter(snapshots_dir=args.destination, |
|
snapshot_name=args.snapshot_name, |
|
exclude_paths=args.exclude, |
|
dry_mode=args.dry, |
|
skip_symlink_targets=args.skip_symlink_targets, |
|
verbose=args.verbose) |
|
|
|
if not args.destination: |
|
die("A non-empty destination directory must be provided for storing snapshots.") |
|
|
|
target_paths = (Path(t).absolute() for t in args.targets) |
|
|
|
if args.targets: |
|
snapshotter.snapshot(target_paths) |
|
|
|
if args.garbage_collect: |
|
snapshotter.garbage_collect_blob_cache() |
|
|
|
return 0 |
|
|
|
|
|
if __name__ == '__main__': |
|
if os.geteuid() != 0: |
|
die(f"{__file__} must be executed with root privileges.") |
|
sys.exit(main()) |