Skip to content

Instantly share code, notes, and snippets.

@Mazyod

Mazyod/watchdog.py

Last active Apr 13, 2019
Embed
What would you like to do?
Directory/files watchdog in Python
import os
import logging
from queue import Queue, Empty as EmptyQueue
from time import sleep
from threading import Thread
from typing import List
logger = logging.getLogger("watchdog")
class WatchdogEvent:
"""Watchdog event to be dispatched to the listener"""
ADD = "ADD"
REM = "REM"
MOD = "MOD"
def __init__(self, kind: str, filepath: str):
self.kind = kind
self.filepath = filepath
def __str__(self):
return f"[{self.kind}] {self.filepath}"
def __repr__(self):
return str(self)
class Watchdog:
"""class to monitor a directory for changes"""
def __init__(self, path, matcher, on_change):
path = os.path.expanduser(path)
path = os.path.abspath(path)
assert(os.path.isdir(path))
thread = Thread(target=self._work)
thread.daemon = True
self.path = path
self.matcher = matcher
self.on_change = on_change
self.thread = thread
self.queue = Queue()
self.queue.put_nowait(True)
thread.start()
def trigger(self, reset: bool):
self.queue.put_nowait(reset)
def _calculate_checksums(self) -> List[WatchdogEvent]:
the_walk = os.walk(self.path)
events = []
old_checksums = self.checksums.copy()
def calc_checksum(filepath, events):
if not self.matcher(filepath):
return
checksum = os.stat(filepath).st_mtime
if filepath in old_checksums:
del old_checksums[filepath]
if filepath not in self.checksums:
self.checksums[filepath] = checksum
events.append(WatchdogEvent(WatchdogEvent.ADD, filepath))
elif self.checksums[filepath] != checksum:
self.checksums[filepath] = checksum
events.append(WatchdogEvent(WatchdogEvent.MOD, filepath))
events = []
for dirname, dirs, files in the_walk:
# assume we are ignoring hidden files/folders
if "/." in dirname:
continue
filepaths = [os.path.join(dirname, f) for f in files]
[calc_checksum(f, events) for f in filepaths]
for filepath in old_checksums:
# handling REM first avoids possible duplicate paths
events.insert(0, WatchdogEvent(WatchdogEvent.REM, filepath))
del self.checksums[filepath]
return events
def _work(self):
while True:
try:
reset = self.queue.get(timeout=0.5)
except EmptyQueue:
reset = False
if reset:
self.checksums = {}
events = self._calculate_checksums()
for event in events:
# don't spam us with initial loads
if not reset:
logger.debug(f"Change detected: {event}")
self.on_change(event)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.