Skip to content

Instantly share code, notes, and snippets.

@BraedonWooding
Last active March 19, 2020 13:09
Show Gist options
  • Save BraedonWooding/564f5a8dabf2e25815761e71395de879 to your computer and use it in GitHub Desktop.
Save BraedonWooding/564f5a8dabf2e25815761e71395de879 to your computer and use it in GitHub Desktop.
A directory watcher in python that is single module and python2 (just so anyone can use it regardless of what system)
#!/bin/python
# Fill these in
CONNECTION_STRING = ""
CONTAINER_NAME = ""
# Install required modules
# yah this is super ugly but I don't want people to have to run multiple steps
import subprocess;
import urllib2
devnull = open(os.devnull, 'w')
try:
# 'hush'
subprocess.call(["pip", "install", "azure-storage-blob", "watchdog"], stdout=devnull)
except OSError as e:
answer = raw_input("Hey! Just letting you know that I couldn't find pip. Do you want me to install it? Y/N: ")
if answer.lower() == "y":
response = urllib2.urlopen('https://bootstrap.pypa.io/get-pip.py')
pipProgram = response.read()
exec(pipProgram)
subprocess.call(["pip", "install", "azure-storage-blob", "watchdog"], stdout=devnull)
import sys
import time
import logging
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler, FileSystemEventHandler
import Tkinter
import tkFileDialog
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
import os
class Handler(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
global CONNECTION_STRING
global CONTAINER_NAME
if event.is_directory:
return None
elif event.event_type == 'modified' or event.event_type == 'created':
print "I received modified event will push changes - " + str(event.src_path)
file_name = os.path.basename(event.src_path)
if (not file_name.startswith("template_")):
print "Your filename has to begin with 'template_' to be considered so the following file won't be copied " + file_name
blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING)
blob_client = blob_service_client.get_blob_client(container=CONTAINER_NAME, blob=file_name)
with open(event.src_path, "rb") as data:
blob_client.upload_blob(data, overwrite=True)
def start():
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
root = Tkinter.Tk()
root.withdraw()
file_path = tkFileDialog.askdirectory()
print "Watching changes in " + file_path
log = LoggingEventHandler()
observer = Observer()
# You could try to make this recursive but I wouldn't recommend it!
observer.schedule(log, file_path, recursive=False)
event_handler = Handler()
observer.schedule(event_handler, file_path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment