Skip to content

Instantly share code, notes, and snippets.

@velotiotech
Created December 17, 2020 11:29
Show Gist options
  • Select an option

  • Save velotiotech/ba1ee538083d4eaf85c39d805fc56205 to your computer and use it in GitHub Desktop.

Select an option

Save velotiotech/ba1ee538083d4eaf85c39d805fc56205 to your computer and use it in GitHub Desktop.
Auto deploy serverless
import time
import subprocess
import threading
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
valid_events = ['created', 'modified', 'deleted', 'moved']
DEPLOY_AFTER_CHANGE_THRESHOLD = 300
STAGE_NAME = ""
CODE_PATH = "<codebase path>"
def deploy_env():
process = subprocess.Popen(['sls', 'deploy', '--stage', STAGE_NAME, '-v'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
print(stdout, stderr)
def deploy_service_on_change():
while True:
if EventHandler.last_update_time and (int(time.time() - EventHandler.last_update_time) > DEPLOY_AFTER_CHANGE_THRESHOLD):
EventHandler.last_update_time = None
deploy_env()
time.sleep(5)
def start_interval_watcher_thread():
interval_watcher_thread = threading.Thread(target=deploy_service_on_change)
interval_watcher_thread.start()
class WatchMyCodebase:
# Set the directory on watch
def __init__(self):
self.observer = Observer()
def run(self):
event_handler = EventHandler()
self.observer.schedule(event_handler, CODE_PATH, recursive=True)
self.observer.start()
self.observer.join()
class EventHandler(FileSystemEventHandler):
"""Handle events generated by Watchdog Observer"""
last_update_time = None
@classmethod
def on_any_event(cls, event):
if event.is_directory:
"""Ignore directory level events, like creating new empty directory etc.."""
return None
elif event.event_type in valid_events and '.serverless' not in event.src_path:
# Ignore events related to changes in .serverless directory, serverless creates few temp file while deploy
cls.last_update_time = time.time()
if __name__ == '__main__':
start_interval_watcher_thread()
watch = WatchMyCodebase()
watch.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment