Skip to content

Instantly share code, notes, and snippets.

@autrilla
Created March 26, 2015 16:44
Show Gist options
  • Save autrilla/bdcf4b32624404141fe5 to your computer and use it in GitHub Desktop.
Save autrilla/bdcf4b32624404141fe5 to your computer and use it in GitHub Desktop.
# SETTINGS
# Tuple of tuples of directories to be watched and corresponding AWS S3 bucket
watch_dirs = (
('/home/autrilla/Documents/Scripts/test/', 's3://tiendaganadera/'),
)
aws_sqs_url = 'https://sqs.eu-west-1.amazonaws.com/198042352723/FAVEGASync'
sqs_update_interval = 1
# ----------------------------------------------------------------------------
try:
import boto3
import pyinotify
import threading
import re
import os
except:
print("FAVEGASync requires pyinotify and boto3. Install them with pip.")
exit(1)
handled_messages = list()
class EventHandler(pyinotify.ProcessEvent):
def process_IN_DELETE(self, event):
for directory in watch_dirs:
if directory[0] in event.pathname:
handled_messages.append(queue.send_message
(MessageBody="DELETE " + directory[1] +
event.pathname.replace(directory[0],
""))['MessageId'])
def process_IN_CLOSE_WRITE(self, event):
for directory in watch_dirs:
if directory[0] in event.pathname:
handled_messages.append(queue.send_message
(MessageBody="WRITE " + directory[1] +
event.pathname.replace(directory[0],
""))['MessageId'])
sqs = boto3.resource('sqs')
s3 = boto3.resource('s3')
queue = boto3.resource('sqs').Queue(aws_sqs_url)
def collect_messages():
threading.Timer(sqs_update_interval, collect_messages).start()
for message in (x for x in queue.receive_messages()
if x.message_id not in handled_messages):
if handle_message(message):
handled_messages.append(message.message_id)
def handle_message(message):
print("Handling: " + str(message))
bucket = re.search(r"s3:\/\/(?P<bucket>.*?)\/", message.body,
re.VERBOSE).group('bucket')
_object = re.search(r"s3:\/\/.*?\/(?P<object>.*)", message.body,
re.VERBOSE).group('object')
p = (dirs for dirs in watch_dirs if dirs[1] in message.body).__next__()
if message.body.startswith("DELETE"):
os.remove(message.body.replace("DELETE ", "").replace(p[1], p[0]))
elif message.body.startswith("WRITE"):
filename = message.body.replace("WRITE ", "").replace(p[1], p[0])
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
f = open(filename, 'wb')
f.write(s3.Bucket(bucket).Object(_object).get()['Body'].read())
f.close()
return True
collect_messages()
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm, EventHandler())
for directory in watch_dirs:
wm.add_watch(directory[0],
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE, rec=True)
notifier.loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment