Skip to content

Instantly share code, notes, and snippets.

@jsjolund
Forked from mivade/aiowatch.py
Last active November 3, 2022 11:29
Show Gist options
  • Save jsjolund/3aebad5fad6ffdcfc8d85bbcecc88bf3 to your computer and use it in GitHub Desktop.
Save jsjolund/3aebad5fad6ffdcfc8d85bbcecc88bf3 to your computer and use it in GitHub Desktop.
Using watchdog with asyncio
import asyncio
from pathlib import Path
from typing import Optional
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
class _EventHandler(FileSystemEventHandler):
def __init__(self, queue: asyncio.Queue, loop: asyncio.BaseEventLoop,
*args, **kwargs):
self._loop = loop
self._queue = queue
super(*args, **kwargs)
def on_created(self, event: FileSystemEvent) -> None:
self._loop.call_soon_threadsafe(self._queue.put_nowait, event)
class EventIterator(object):
def __init__(self, queue: asyncio.Queue,
loop: Optional[asyncio.BaseEventLoop] = None):
self.queue = queue
def __aiter__(self):
return self
async def __anext__(self):
item = await self.queue.get()
if item is None:
raise StopAsyncIteration
return item
def watch(path: Path, queue: asyncio.Queue, loop: asyncio.BaseEventLoop,
recursive: bool = False) -> None:
"""Watch a directory for changes."""
handler = _EventHandler(queue, loop)
observer = Observer()
observer.schedule(handler, str(path), recursive=recursive)
observer.start()
print("Observer started")
try:
while (observer.event_queue.unfinished_tasks != 0):
await asyncio.sleep(1)
except asyncio.CancelledError:
observer.join()
loop.call_soon_threadsafe(queue.put_nowait, None)
async def consume(queue: asyncio.Queue) -> None:
async for event in EventIterator(queue):
print("Got an event!", event)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
futures = [
loop.run_in_executor(None, watch, Path("."), queue, loop, False),
consume(queue),
]
try:
loop.run_until_complete(asyncio.gather(*futures))
except KeyboardInterrupt:
print("Caught keyboard interrupt. Canceling tasks...")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment