Skip to content

Instantly share code, notes, and snippets.

@zzstoatzz
Last active April 14, 2024 16:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zzstoatzz/ff2bf4eaec4fc19d33000b24b620176d to your computer and use it in GitHub Desktop.
Save zzstoatzz/ff2bf4eaec4fc19d33000b24b620176d to your computer and use it in GitHub Desktop.
import functools
import os
import threading
import time
import uuid
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from prefect import flow
from prefect.events import DeploymentEventTrigger, emit_event
from prefect.logging import get_logger
logger = get_logger("watchdog")
DROP_FOLDER = "flows/proofs/watchdog/drop"
WORKING_FOLDER = "flows/proofs/watchdog/working"
class DesignFileHandler(FileSystemEventHandler):
def __init__(self, drop_folder: str):
self.drop_folder = drop_folder
def on_created(self, event):
if not event.is_directory:
file_path = event.src_path
logger.info(f"New file detected: {file_path}")
emit_event(
event="file.created",
resource={
"prefect.resource.id": str(uuid.uuid4()),
"directory.name": self.drop_folder,
"file.absolute_path": file_path,
},
)
def watch_drop_folder(drop_folder: str):
event_handler = DesignFileHandler(drop_folder)
observer = Observer()
observer.schedule(event_handler, drop_folder, recursive=False)
observer.start()
logger.info(f"Watching drop folder: {drop_folder}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
finally:
observer.stop()
observer.join()
@flow
def process_new_file(new_file_abs_path: str, working_folder: str = WORKING_FOLDER):
logger.info(f"Processing new file: {new_file_abs_path}")
time.sleep(1) # Simulate processing time
logger.info("File processed")
# Move new_file_abs_path to working_folder
destination_path = os.path.join(working_folder, os.path.basename(new_file_abs_path))
os.rename(new_file_abs_path, destination_path)
logger.info(f"File moved to working folder: {working_folder}")
if __name__ == "__main__":
# Start the file watcher
watcher_thread = threading.Thread(target=watch_drop_folder, args=(DROP_FOLDER,))
watcher_thread.start()
# Start the flow
flow_thread = threading.Thread(
target=functools.partial(
process_new_file.serve,
name="process_new_file",
triggers=[
DeploymentEventTrigger(
expect={"file.created"},
match={"directory.name": DROP_FOLDER},
parameters={
"new_file_abs_path": "{{ event.resource['file.absolute_path'] }}"
},
)
],
)
)
flow_thread.start()
watcher_thread.join()
flow_thread.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment