import time import subprocess import threading from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler valid_events = ['created', 'modified', 'deleted', 'moved'] DEPLOY_AFTER_CHANGE_THRESHOLD = 300 STAGE_NAME = "" CODE_PATH = "<codebase path>" def deploy_env(): process = subprocess.Popen(['sls', 'deploy', '--stage', STAGE_NAME, '-v'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() print(stdout, stderr) def deploy_service_on_change(): while True: if EventHandler.last_update_time and (int(time.time() - EventHandler.last_update_time) > DEPLOY_AFTER_CHANGE_THRESHOLD): EventHandler.last_update_time = None deploy_env() time.sleep(5) def start_interval_watcher_thread(): interval_watcher_thread = threading.Thread(target=deploy_service_on_change) interval_watcher_thread.start() class WatchMyCodebase: # Set the directory on watch def __init__(self): self.observer = Observer() def run(self): event_handler = EventHandler() self.observer.schedule(event_handler, CODE_PATH, recursive=True) self.observer.start() self.observer.join() class EventHandler(FileSystemEventHandler): """Handle events generated by Watchdog Observer""" last_update_time = None @classmethod def on_any_event(cls, event): if event.is_directory: """Ignore directory level events, like creating new empty directory etc..""" return None elif event.event_type in valid_events and '.serverless' not in event.src_path: # Ignore events related to changes in .serverless directory, serverless creates few temp file while deploy cls.last_update_time = time.time() if __name__ == '__main__': start_interval_watcher_thread() watch = WatchMyCodebase() watch.run()