Skip to content

Instantly share code, notes, and snippets.

@mivade
Created October 26, 2018 17:00
Show Gist options
  • Star 21 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save mivade/f4cb26c282d421a62e8b9a341c7c65f6 to your computer and use it in GitHub Desktop.
Save mivade/f4cb26c282d421a62e8b9a341c7c65f6 to your computer and use it in GitHub Desktop.
Using watchdog with asyncio
import asyncio
from pathlib import Path
from typing import Optional
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
class _EventHandler(FileSystemEventHandler):
def __init__(self, queue: asyncio.Queue, loop: asyncio.BaseEventLoop,
*args, **kwargs):
self._loop = loop
self._queue = queue
super(*args, **kwargs)
def on_created(self, event: FileSystemEvent) -> None:
self._loop.call_soon_threadsafe(self._queue.put_nowait, event)
class EventIterator(object):
def __init__(self, queue: asyncio.Queue,
loop: Optional[asyncio.BaseEventLoop] = None):
self.queue = queue
def __aiter__(self):
return self
async def __anext__(self):
item = await self.queue.get()
if item is None:
raise StopAsyncIteration
return item
def watch(path: Path, queue: asyncio.Queue, loop: asyncio.BaseEventLoop,
recursive: bool = False) -> None:
"""Watch a directory for changes."""
handler = _EventHandler(queue, loop)
observer = Observer()
observer.schedule(handler, str(path), recursive=recursive)
observer.start()
print("Observer started")
observer.join(10)
loop.call_soon_threadsafe(queue.put_nowait, None)
async def consume(queue: asyncio.Queue) -> None:
async for event in EventIterator(queue):
print("Got an event!", event)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
futures = [
loop.run_in_executor(None, watch, Path("."), queue, loop, False),
consume(queue),
]
loop.run_until_complete(asyncio.gather(*futures))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment