Last active
April 2, 2024 14:39
-
-
Save luis261/48111222a4341e8f981669c2a20f7ce0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import os | |
import shutil | |
import warnings | |
# some useful defaults | |
diff_hash = lambda bytez : hashlib.md5(bytez, usedforsecurity=False).hexdigest() | |
diff_size = lambda bytez : str(len(bytez)) | |
class DirObserver(): | |
""" | |
CTX manageable class for keeping tabs on a directory. | |
NOT intended for security purposes, only for simple validation! | |
Takes in a target path to watch and once attached, caches a digest | |
of the initial state of files in the given directory. On exit, | |
emits information about any detected changes. | |
You can restrict the observation to files with a certain | |
extension via `suffix_criterion`. | |
By default, MD5 hashes are used to detect changes. In some cases, | |
you might want an even fuzzier comparison via file size, or even | |
your own, custom comparison function. | |
If `managed_archive` is truthy, in addition to the digest which is | |
cached in memory, a full snapshot of the initial file state is | |
persisted to a managed subdirectory in the filesystem. | |
`archiving_fn` lets you specify a custom function for moving the | |
found files to the archive subdirectory. You'll be looking for | |
a callee which works regardless of whether or not the destination | |
already exists (thus performing overrides if needed). On Windows, | |
such scenarios seem to be especially flimsy. Judging from my own | |
testing, I can recommend `shutil.copy` as a base setup and | |
`os.replace` if you want the files moved to the archive instead | |
of a copy (can't recommend `shutil.move` for that). | |
""" | |
def __init__( | |
self, target_path, | |
suffix_criterion=None, | |
diff_criterion=diff_hash, | |
managed_archive=False, | |
archiving_fn=shutil.copy | |
): | |
self._suffix_criterion = suffix_criterion | |
self._target = target_path | |
self._diff_fn = staticmethod(diff_criterion) | |
self._archive_path = os.path.join(target_path, "dirobsrv_auto_archive") if managed_archive else None | |
self._archiving_fn = staticmethod(archiving_fn) | |
self.changes_detected = False | |
def __enter__(self): | |
return self.attach() | |
def attach(self): | |
print("Starting up dir observation for " + repr(self._target) + ".") | |
self._base_digest = self.__class__.digest_dir(self._target, self._diff_fn, self._suffix_criterion) | |
if self._archive_path: | |
self._archive_current_files() | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.detach() | |
def detach(self): | |
print("Shutting down dir observation for " + repr(self._target) + " ...") | |
self.evaluate(purge=True) | |
def evaluate(self, purge=False): | |
try: | |
self._base_digest | |
except AttributeError: | |
warnings.warn("Detected out of place evaluation call, proceeding with adhoc, implicit attachment", RuntimeWarning) | |
self.attach() | |
new_digest = self.__class__.digest_dir(self._target, self._diff_fn, self._suffix_criterion) | |
if self._base_digest != new_digest: | |
self.changes_detected = True | |
print("Detected changes compared to baseline:") | |
self.__class__._empty_else_emit( | |
self._extract_added(new_digest), | |
"these files were added" | |
) | |
self.__class__._empty_else_emit( | |
self._extract_changed(new_digest), | |
"these files were modified" | |
) | |
self.__class__._empty_else_emit( | |
self._extract_missing(new_digest), | |
"these files went missing" | |
) | |
else: | |
print("No changes detected.") | |
if purge: | |
del self._base_digest | |
else: | |
self._base_digest = new_digest | |
return self.changes_detected | |
@staticmethod | |
def digest_dir(target, diff_fn, suffix_criterion): | |
digest = {} | |
for file in os.scandir(target): | |
if file.is_dir() or (suffix_criterion and not file.name.endswith(suffix_criterion)): | |
continue | |
with open(file.path, "rb") as f: | |
# The following way of calculating MD5 might lead to memory exhaustion for huge files. | |
# I'll consider moving to `hashlib.file_digest` for newer versions instead, | |
# which is optimized for bigger files and works in chunks AFAIK | |
digest[file.name] = diff_fn(f.read()) | |
return digest | |
def _extract_added(self, new_digest): | |
return list(set(new_digest.keys()) - set(self._base_digest.keys())) | |
def _extract_missing(self, new_digest): | |
return list(set(self._base_digest.keys()) - set(new_digest.keys())) | |
def _extract_changed(self, new_digest): | |
res = [] | |
for f_name in set(self._base_digest.keys()) & set(new_digest.keys()): | |
if self._base_digest[f_name] != new_digest[f_name]: | |
res.append(f_name + " from: " + self._base_digest[f_name] | |
+ " to: " + new_digest[f_name]) | |
return res | |
def _archive_current_files(self): | |
if not os.path.exists(self._archive_path): | |
os.makedirs(self._archive_path, exist_ok=True) | |
for file in os.scandir(self._target): | |
if file.is_dir() or (self._suffix_criterion and not file.name.endswith(self._suffix_criterion)): | |
continue | |
self._archiving_fn( | |
os.path.join(self._target, file.name), | |
os.path.join(self._archive_path, file.name) | |
) | |
@staticmethod | |
def _empty_else_emit(diff, message): | |
if len(diff) == 0: | |
return | |
print("> " + message + ":" + "\n ".join([""] + diff)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment