Skip to content

Instantly share code, notes, and snippets.

@luis261
Last active April 2, 2024 14:39
Show Gist options
  • Save luis261/48111222a4341e8f981669c2a20f7ce0 to your computer and use it in GitHub Desktop.
Save luis261/48111222a4341e8f981669c2a20f7ce0 to your computer and use it in GitHub Desktop.
import hashlib
import os
import shutil
import warnings
# some useful defaults
diff_hash = lambda bytez : hashlib.md5(bytez, usedforsecurity=False).hexdigest()
diff_size = lambda bytez : str(len(bytez))
class DirObserver():
"""
CTX manageable class for keeping tabs on a directory.
NOT intended for security purposes, only for simple validation!
Takes in a target path to watch and once attached, caches a digest
of the initial state of files in the given directory. On exit,
emits information about any detected changes.
You can restrict the observation to files with a certain
extension via `suffix_criterion`.
By default, MD5 hashes are used to detect changes. In some cases,
you might want an even fuzzier comparison via file size, or even
your own, custom comparison function.
If `managed_archive` is truthy, in addition to the digest which is
cached in memory, a full snapshot of the initial file state is
persisted to a managed subdirectory in the filesystem.
`archiving_fn` lets you specify a custom function for moving the
found files to the archive subdirectory. You'll be looking for
a callee which works regardless of whether or not the destination
already exists (thus performing overrides if needed). On Windows,
such scenarios seem to be especially flimsy. Judging from my own
testing, I can recommend `shutil.copy` as a base setup and
`os.replace` if you want the files moved to the archive instead
of a copy (can't recommend `shutil.move` for that).
"""
def __init__(
self, target_path,
suffix_criterion=None,
diff_criterion=diff_hash,
managed_archive=False,
archiving_fn=shutil.copy
):
self._suffix_criterion = suffix_criterion
self._target = target_path
self._diff_fn = staticmethod(diff_criterion)
self._archive_path = os.path.join(target_path, "dirobsrv_auto_archive") if managed_archive else None
self._archiving_fn = staticmethod(archiving_fn)
self.changes_detected = False
def __enter__(self):
return self.attach()
def attach(self):
print("Starting up dir observation for " + repr(self._target) + ".")
self._base_digest = self.__class__.digest_dir(self._target, self._diff_fn, self._suffix_criterion)
if self._archive_path:
self._archive_current_files()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.detach()
def detach(self):
print("Shutting down dir observation for " + repr(self._target) + " ...")
self.evaluate(purge=True)
def evaluate(self, purge=False):
try:
self._base_digest
except AttributeError:
warnings.warn("Detected out of place evaluation call, proceeding with adhoc, implicit attachment", RuntimeWarning)
self.attach()
new_digest = self.__class__.digest_dir(self._target, self._diff_fn, self._suffix_criterion)
if self._base_digest != new_digest:
self.changes_detected = True
print("Detected changes compared to baseline:")
self.__class__._empty_else_emit(
self._extract_added(new_digest),
"these files were added"
)
self.__class__._empty_else_emit(
self._extract_changed(new_digest),
"these files were modified"
)
self.__class__._empty_else_emit(
self._extract_missing(new_digest),
"these files went missing"
)
else:
print("No changes detected.")
if purge:
del self._base_digest
else:
self._base_digest = new_digest
return self.changes_detected
@staticmethod
def digest_dir(target, diff_fn, suffix_criterion):
digest = {}
for file in os.scandir(target):
if file.is_dir() or (suffix_criterion and not file.name.endswith(suffix_criterion)):
continue
with open(file.path, "rb") as f:
# The following way of calculating MD5 might lead to memory exhaustion for huge files.
# I'll consider moving to `hashlib.file_digest` for newer versions instead,
# which is optimized for bigger files and works in chunks AFAIK
digest[file.name] = diff_fn(f.read())
return digest
def _extract_added(self, new_digest):
return list(set(new_digest.keys()) - set(self._base_digest.keys()))
def _extract_missing(self, new_digest):
return list(set(self._base_digest.keys()) - set(new_digest.keys()))
def _extract_changed(self, new_digest):
res = []
for f_name in set(self._base_digest.keys()) & set(new_digest.keys()):
if self._base_digest[f_name] != new_digest[f_name]:
res.append(f_name + " from: " + self._base_digest[f_name]
+ " to: " + new_digest[f_name])
return res
def _archive_current_files(self):
if not os.path.exists(self._archive_path):
os.makedirs(self._archive_path, exist_ok=True)
for file in os.scandir(self._target):
if file.is_dir() or (self._suffix_criterion and not file.name.endswith(self._suffix_criterion)):
continue
self._archiving_fn(
os.path.join(self._target, file.name),
os.path.join(self._archive_path, file.name)
)
@staticmethod
def _empty_else_emit(diff, message):
if len(diff) == 0:
return
print("> " + message + ":" + "\n ".join([""] + diff))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment