Skip to content

Instantly share code, notes, and snippets.

@m-bartlett
Last active January 19, 2024 22:35
Show Gist options
  • Save m-bartlett/a54a9449b10b7745015e096abd54c063 to your computer and use it in GitHub Desktop.
Save m-bartlett/a54a9449b10b7745015e096abd54c063 to your computer and use it in GitHub Desktop.
git-like filesystem snapshotting utility which hashes file contents for uniqueness and hardlinks unchanged files to existing hashed files to avoid redundancy

SnapSHAt

This tool uses the existing Python standard library as of 3.11, no external dependencies are needed.

Note

When I was initially began writing this tool I was using SHA256 to compute file hashes but later found empirically that BLAKE2 was considerably faster, and since filesystems can easily exceed millions of files to hash it was a significant speedup to change the hashing algorithm. However, I still feel this pun encapsulates the essence of the tool and couldn't think of a better name.

Features

  • Stores unique file contents in a "blob cache" where the content files are renamed after the hash of their contents (this is similar to how git stores files).
  • Hardlinks files to these hashed blobs. The usefulness of this comes with subsequent snapshots, where presumably the majority of your filesystem is unchanged. Unchanged files will produce the same hash, and therefore can be hardlinked once again to the same blob which reuses the storage in the blob instead of creating a redundant copy of the file.
  • Supports garbage collecting the blob cache. It is quite easy to determine if a given blob file is being hardlinked against, so this option will prompt the tool to remove any orphaned blobs which don't have any hardlinks to it.
  • Versatile include/exclude options for controlling precisely which files make it into the snapshot. Supports globbing, base file names, and complete path exclusions. Inclusions are assumed to be directories and will be recursively descended into.
  • Graceful handling of symlinks. The tool will preserve "absolute" symlinks by re-linking the absolute path's root to the snapshot directory. It will not modify relative symlinks since those will work as expected within the snapshot.

Sample Usage

$ snapSHAt.py \
    --destination /mnt/snapshot-hdd/Snapshots \
    --garbage-collect-cache \
    --targets \
        /boot \
        /cfg \
        /etc \
        /home \
        /opt \
        /usr/local \
    --exclude \
        /run \
        /proc \
        /sys \
        /usr/include/ \
        /usr/share/ \
        ~/Downloads \
        /etc/localtime \
        /etc/mtab \
        WebStorage \
        CacheStorage \
        IndexedDB \
        .git \
        .cache \
        cache \

See the --help output for more usage details.

#!/usr/bin/env python3
import os
import sys
import shutil
import threading
import signal
from argparse import ArgumentParser
from pathlib import Path
from hashlib import blake2b
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from fnmatch import fnmatch
def printerr(*args, **kwargs):
kwargs['file']=sys.stderr
print(*args, **kwargs)
sys.stderr.flush()
def die(msg):
printerr(f"\033[31mERROR:\033[0m {msg}")
sys.exit(1)
EXECUTABLE_NAME = "snapSHAt"
class Snapshotter():
BLOB_CACHE_DIR_NAME = f'.{EXECUTABLE_NAME}.d'
_print_lock = threading.Lock()
def __init__(self,
snapshots_dir: str,
snapshot_name: str,
exclude_paths: list[str],
dry_mode: bool=False,
skip_symlink_targets: bool=False,
verbose: bool=False):
self.snapshot_name = snapshot_name
self.blob_cache_path = Path(f"{snapshots_dir}/{self.BLOB_CACHE_DIR_NAME}").absolute()
self.snapshot_path = Path(f"{snapshots_dir}/{snapshot_name}").absolute()
self.exclude_paths = set(exclude_paths)
self.exclude_paths.add(snapshots_dir)
self.skip_symlink_targets = skip_symlink_targets
self.target_hashes = None
if dry_mode:
self.snapshot_file = \
self.snapshot_dir = \
self.snapshot_symlink = \
self.snapshot_other = lambda p, d: print(p)
if verbose:
self.verbose_print = printerr
else:
self.verbose_print = self.print_transitory
self._match_terminal_width()
signal.signal(signal.SIGWINCH, self._match_terminal_width)
def _match_terminal_width(self, *_):
with __class__._print_lock:
try:
terminal_width = os.get_terminal_size()[0] - 2
except OSError:
terminal_width = 80
self._terminal_width = terminal_width
def print_transitory(self, msg=''):
if self._string_length_ascii(msg) > self._terminal_width:
msg = msg[:self._terminal_width-3] + '...'
with __class__._print_lock:
printerr(f"\033[0J{msg}", end='\r')
@staticmethod
def get_file_hash(path: Path, prefix_length: int=3):
return blake2b(path.read_bytes()).digest().hex()
@staticmethod
def get_nested_hash_path(file_hash: str, dir_delimiter: str='/', prefix_length=3):
return dir_delimiter.join((file_hash[:prefix_length], file_hash[prefix_length:]))
@staticmethod
def _string_length_ascii(s: str):
return len([c if ord(c)<128 else ' ' for c in s])
def recursive_path_iter(self, path: Path):
if any(any((path.name == x, path.is_relative_to(x), fnmatch(path, x)))
for x in self.exclude_paths):
self.print_transitory()
print(f"Excluding {path}")
return
if path.is_dir():
yield path
if not path.is_symlink():
for p in path.iterdir():
yield from self.recursive_path_iter(p)
else:
yield path
def get_target_hashes(self, targets: [Path]):
target_hashes = {}
def _get_path_and_hash(file_path):
file_hash = self.get_file_hash(file_path)
self.print_transitory(f"{file_path} => {file_hash}")
return file_path, file_hash
with ThreadPoolExecutor(max_workers=os.cpu_count()-1) as pool:
all_targets = (p for t in targets for p in self.recursive_path_iter(t))
target_files = filter(lambda p: p.is_file(), all_targets)
hash_results = pool.map(_get_path_and_hash, target_files)
for file_path, file_hash in hash_results:
target_hashes[file_path] = file_hash
self.print_transitory()
return target_hashes
def snapshot_file(self, file: Path, destination: Path):
file_hash = self.target_hashes.get(file)
if file_hash is None:
return
nested_hash_path = self.get_nested_hash_path(file_hash, dir_delimiter=file.anchor)
blob_path = self.blob_cache_path / nested_hash_path
blob_path.parent.mkdir(parents=True, exist_ok=True)
if blob_path.exists():
verbose_msg = f"Linked cached {file}"
else:
shutil.copy2(file, blob_path)
verbose_msg = f"Linked {file} to {file_hash}"
destination.parent.mkdir(exist_ok=True, parents=True)
if destination.exists():
if destination.samefile(blob_path):
printerr(f"{file} is already linked to {file_hash} in this snapshot. The most"
" likely cause of this is that a symlink captured in this snapshot points"
" to this path.")
else:
printerr(f"{file} already exists in this snapshot but it does not target the blob"
f" in the cache that matches its current hash {file_hash}. Something"
" unexpected has occured, dropping into and interactive debugger.")
breakpoint()
else:
destination.hardlink_to(blob_path)
self.verbose_print(verbose_msg)
return
def snapshot_dir(self, directory: Path, destination: Path):
destination.mkdir(parents=True, exist_ok=True)
return
def snapshot_symlink(self, symlink: Path, destination: Path):
if destination.exists():
return
symlink_destination = symlink.readlink()
if symlink_destination.is_absolute():
# re-root the absolute symlink's target
snapshot_symlink_destination = Path(f"{self.snapshot_path}/{symlink_destination}")
else:
# relative symlink will work as expected under the snapshot dir, just duplicate it
snapshot_symlink_destination = symlink_destination
symlink_destination = (symlink.parent / symlink_destination).resolve()
self.verbose_print(f"Symlink: {symlink} => {symlink_destination}")
destination.parent.mkdir(parents=True, exist_ok=True)
destination.symlink_to(snapshot_symlink_destination)
self.snapshot_by_type(symlink_destination)
return
def snapshot_other(self, path: Path, destination: Path):
print(f"Unhandled path type for {path}")
breakpoint()
def snapshot_by_type(self, path: Path):
destination = Path(f"{self.snapshot_path}/{path}")
if path.is_symlink():
return self.snapshot_symlink(path, destination)
if path.is_dir():
return self.snapshot_dir(path, destination)
if path.is_file():
return self.snapshot_file(path, destination)
if path.is_fifo():
return
if path.is_socket():
return
else:
return self.snapshot_other(path, destination)
def snapshot(self, targets: [Path]):
self.target_hashes = self.get_target_hashes(targets)
self.blob_cache_path.mkdir(parents=True, exist_ok=True)
self.snapshot_path.mkdir(parents=True, exist_ok=False)
self.verbose_print(f"Beginning snapshot {self.snapshot_name}")
for p in self.target_hashes.keys():
self.snapshot_by_type(p)
def get_snapshot_size(self, path: Path):
for p in self.recursive_path_iter(path):
self.snapshot_by_type(p)
# path.stat().st_size / 1_000_000_000
def garbage_collect_blob_cache(self):
blobs = (p for p in self.blob_cache_path.rglob('*') if p.is_file())
for blob in blobs:
if blob.stat().st_nlink < 2:
self.verbose_print(f"Garbage collecting {blob}")
blob.unlink()
self.print_transitory()
def main():
parser = ArgumentParser()
parser.add_argument('--destination', '-o',
required=True,
help="""Path to store filesystem snapshots in. Reuse this destination in
the future to reuse the blob cache and prevent file duplication.""")
parser.add_argument('--targets', '-i',
nargs='*',
default=[],
help="Path(s) to recursively snapshot")
parser.add_argument('--exclude', '-x',
nargs='*',
default=[],
help="Path(s) to omit from the snapshot")
parser.add_argument('--snapshot-name', '-n',
default=datetime.now().strftime('%F %H:%M:%S'),
help=f"""Name to use for this snapshot's directory. Defaults to the current
timestamp with filesystem-naming compatible delimiters.""")
parser.add_argument('--garbage-collect-cache', '--gc',
dest='garbage_collect',
action="store_true",
help="Run garbage collection in the blob cache")
parser.add_argument('--dry',
action="store_true",
help="""Only print what file operations would be performed instead of
actually performing them. Useful as a sanity check.""")
parser.add_argument('--verbose', '-v',
action="store_true",
help="Output extra information during snapshot operations.")
args = parser.parse_args()
snapshotter = Snapshotter(snapshots_dir=args.destination,
snapshot_name=args.snapshot_name,
exclude_paths=args.exclude,
dry_mode=args.dry,
skip_symlink_targets=args.skip_symlink_targets,
verbose=args.verbose)
if not args.destination:
die("A non-empty destination directory must be provided for storing snapshots.")
target_paths = (Path(t).absolute() for t in args.targets)
if args.targets:
snapshotter.snapshot(target_paths)
if args.garbage_collect:
snapshotter.garbage_collect_blob_cache()
return 0
if __name__ == '__main__':
if os.geteuid() != 0:
die(f"{__file__} must be executed with root privileges.")
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment