Last active
April 13, 2019 15:26
-
-
Save Mazyod/b1565d3a47b32d5f99931ddacd9dbf64 to your computer and use it in GitHub Desktop.
Directory/files watchdog in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import logging | |
from queue import Queue, Empty as EmptyQueue | |
from time import sleep | |
from threading import Thread | |
from typing import List | |
logger = logging.getLogger("watchdog") | |
class WatchdogEvent: | |
"""Watchdog event to be dispatched to the listener""" | |
ADD = "ADD" | |
REM = "REM" | |
MOD = "MOD" | |
def __init__(self, kind: str, filepath: str): | |
self.kind = kind | |
self.filepath = filepath | |
def __str__(self): | |
return f"[{self.kind}] {self.filepath}" | |
def __repr__(self): | |
return str(self) | |
class Watchdog: | |
"""class to monitor a directory for changes""" | |
def __init__(self, path, matcher, on_change): | |
path = os.path.expanduser(path) | |
path = os.path.abspath(path) | |
assert(os.path.isdir(path)) | |
thread = Thread(target=self._work) | |
thread.daemon = True | |
self.path = path | |
self.matcher = matcher | |
self.on_change = on_change | |
self.thread = thread | |
self.queue = Queue() | |
self.queue.put_nowait(True) | |
thread.start() | |
def trigger(self, reset: bool): | |
self.queue.put_nowait(reset) | |
def _calculate_checksums(self) -> List[WatchdogEvent]: | |
the_walk = os.walk(self.path) | |
events = [] | |
old_checksums = self.checksums.copy() | |
def calc_checksum(filepath, events): | |
if not self.matcher(filepath): | |
return | |
checksum = os.stat(filepath).st_mtime | |
if filepath in old_checksums: | |
del old_checksums[filepath] | |
if filepath not in self.checksums: | |
self.checksums[filepath] = checksum | |
events.append(WatchdogEvent(WatchdogEvent.ADD, filepath)) | |
elif self.checksums[filepath] != checksum: | |
self.checksums[filepath] = checksum | |
events.append(WatchdogEvent(WatchdogEvent.MOD, filepath)) | |
events = [] | |
for dirname, dirs, files in the_walk: | |
# assume we are ignoring hidden files/folders | |
if "/." in dirname: | |
continue | |
filepaths = [os.path.join(dirname, f) for f in files] | |
[calc_checksum(f, events) for f in filepaths] | |
for filepath in old_checksums: | |
# handling REM first avoids possible duplicate paths | |
events.insert(0, WatchdogEvent(WatchdogEvent.REM, filepath)) | |
del self.checksums[filepath] | |
return events | |
def _work(self): | |
while True: | |
try: | |
reset = self.queue.get(timeout=0.5) | |
except EmptyQueue: | |
reset = False | |
if reset: | |
self.checksums = {} | |
events = self._calculate_checksums() | |
for event in events: | |
# don't spam us with initial loads | |
if not reset: | |
logger.debug(f"Change detected: {event}") | |
self.on_change(event) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment