Skip to content

Instantly share code, notes, and snippets.

@tomschr
Created March 20, 2024 07:42
Show Gist options
  • Save tomschr/9ffe6d0b19220618013500d523042657 to your computer and use it in GitHub Desktop.
Save tomschr/9ffe6d0b19220618013500d523042657 to your computer and use it in GitHub Desktop.
Python Watchdog: trigger a function only when there are no more changes for a certain period
#!/usr/bin/python3
#
# Purpose:
# Use watchdog to monitor a directory. After a certain amount of time without any change
# the processing is triggered.
#
import os.path
import time
import threading
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
DIR=os.path.dirname(__file__)
WATCHED_DIR=os.path.join(DIR, 'my-dir')
# These are seconds
WAIT_TIME=5
class DebounceHandler(FileSystemEventHandler):
def __init__(self, func, wait_time):
self.func = func
self.wait_time = wait_time
self.timer = None
def on_any_event(self, event):
if self.timer:
self.timer.cancel()
self.timer = threading.Timer(self.wait_time, self.func)
self.timer.start()
def process_changes():
print("No more changes detected. Processing changes now.")
if __name__ == "__main__":
event_handler = DebounceHandler(process_changes, WAIT_TIME)
observer = Observer()
observer.schedule(event_handler, path=WATCHED_DIR, recursive=True)
observer.start()
print(f"Starting observer for {WATCHED_DIR!r}...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
print("Done.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment