Skip to content

Instantly share code, notes, and snippets.

@autrilla
Created March 26, 2015 18:25
Show Gist options
  • Save autrilla/820d076512fccbdefe9f to your computer and use it in GitHub Desktop.
Save autrilla/820d076512fccbdefe9f to your computer and use it in GitHub Desktop.
# SETTINGS
# Tuple of tuples of directories to be watched and corresponding AWS S3 bucket
watch_dirs = (
('/home/autrilla/Documents/Scripts/test/', 's3://tiendaganadera/'),
)
aws_sqs_url = 'https://sqs.eu-west-1.amazonaws.com/198042352723/FAVEGASync'
sqs_update_interval = 5
# ----------------------------------------------------------------------------
try:
import boto3
import pyinotify
import threading
import re
import logging
import os
import traceback
except:
print("FAVEGASync requires pyinotify and boto3. Install them with pip.")
exit(1)
handled_messages = list()
class EventHandler(pyinotify.ProcessEvent):
def process_IN_DELETE(self, event):
for directory in watch_dirs:
if directory[0] in event.pathname:
bucket = re.search(r"s3:\/\/(?P<bucket>.*?)\/", directory[1],
re.VERBOSE).group('bucket')
s3.Bucket(bucket).Object(event.pathname.replace(directory[0],
"")).delete()
handled_messages.append(queue.send_message
(MessageBody="DELETE " + directory[1] +
event.pathname.replace(directory[0],
""))['MessageId'])
logger.info(handled_messages[-1] + ": Deleted file " +
event.pathname)
def process_IN_CLOSE_WRITE(self, event):
for directory in watch_dirs:
if directory[0] in event.pathname:
try:
f = open(event.pathname, 'rb')
bucket = re.search(r"\/\/(?P<bucket>.*?)\/", directory[1],
re.VERBOSE).group('bucket')
s3.Bucket(bucket).put_object(Key=event.pathname
.replace(directory[0], ""),
Body=f)
handled_messages.append(queue.send_message
(MessageBody="WRITE " +
directory[1] +
event.pathname.
replace(directory[0], ""))[
'MessageId'])
f.close()
logger.info(handled_messages[-1] + ": Wrote file " +
event.pathname)
except FileNotFoundError:
pass
logging.basicConfig(FORMAT="%(asctime)-15s - %(message)s",
filename='/var/log/FAVEGASync/log', level=logging.INFO)
logger = logging.getLogger('FAVEGASync')
sqs = boto3.resource('sqs')
s3 = boto3.resource('s3')
queue = boto3.resource('sqs').Queue(aws_sqs_url)
def collect_messages():
threading.Timer(sqs_update_interval, collect_messages).start()
for message in (x for x in queue.receive_messages(MaxNumberOfMessages=1)
if x.message_id not in handled_messages):
if handle_message(message):
handled_messages.append(message.message_id)
def handle_message(message):
logger.info("Handling message " + str(message.message_id))
bucket = re.search(r"s3:\/\/(?P<bucket>.*?)\/", message.body,
re.VERBOSE).group('bucket')
_object = re.search(r"s3:\/\/.*?\/(?P<object>.*)", message.body,
re.VERBOSE).group('object')
p = (dirs for dirs in watch_dirs if dirs[1] in message.body).__next__()
if message.body.startswith("DELETE"):
filename = message.body.replace("DELETE ", "").replace(p[1], p[0])
os.remove(filename)
logger.info("Deleted file: " + filename)
elif message.body.startswith("WRITE"):
filename = message.body.replace("WRITE ", "").replace(p[1], p[0])
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
f = open(filename, 'wb')
f.write(s3.Bucket(bucket).Object(_object).get()['Body'].read())
f.close()
logger.info("Downloaded file: " + filename)
return True
collect_messages()
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm, EventHandler())
for directory in watch_dirs:
wm.add_watch(directory[0],
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE, rec=True)
try:
notifier.loop()
except FileNotFoundError:
print(traceback.print_exc())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment