Skip to content

Instantly share code, notes, and snippets.

@copernicus
Forked from zircote/ct2ls.py
Created April 29, 2014 01:47
Show Gist options
  • Save copernicus/11388921 to your computer and use it in GitHub Desktop.
Save copernicus/11388921 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import boto
from boto.sqs.message import RawMessage
import tempfile
import json
import logging
import argparse
import gzip
import redis
import os
import time
'''
Packages required: redis, boto
'''
logFormatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
stream = logging.StreamHandler()
stream.setFormatter(logFormatter)
logging.getLogger().addHandler(stream)
logger = logging.getLogger(__name__)
def sqs_fetch(queue):
"""
:param str queue name:
:rtype list:
Returns a list of message from SQS
"""
connection = boto.connect_sqs()
queue = connection.get_queue(queue_name=queue)
queue.set_message_class(RawMessage)
return queue.get_messages(10)
class CloudTrailS3EventLoader(object):
connection = None
def __init__(self):
if self.connection is None:
self.connection = boto.connect_s3()
def fetch_object(self, sqs_event):
"""
:param sqs_event_body object:
:rtype str filename:
Accepts an object that defines the bucket and object location
"""
sqs_event_body = json.loads(sqs_event['Message'])
bucket = self.connection.get_bucket(sqs_event_body['s3Bucket'])
key = bucket.get_key(sqs_event_body['s3ObjectKey'][0])
_, t = tempfile.mkstemp(suffix='.gz')
key.get_contents_to_filename(t)
return t, 's3://%s/%s' % (sqs_event_body['s3Bucket'], sqs_event_body['s3ObjectKey'][0])
def camel_to_lower(camels):
import re
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', camels)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
def write_data(filename, remote_path=None):
f = gzip.open(filename)
j = json.load(f)
f.close()
pipe = redis_client.pipeline()
for record in j['Records']:
try:
record['@timestamp'] = record['eventTime']
if 'eventName' in record:
record['type'] = '%s.%s' % (opts.doctype_prefix, camel_to_lower(record['eventName']))
else:
record['type'] = opts.doctype_prefix
if remote_path is not None:
record['path'] = remote_path
pipe.lpush(opts.redis_list, json.dumps(record))
except Exception as ex:
logger.error(ex)
pipe.execute()
def main():
try:
messages = sqs_fetch(opts.sqs_queue)
logger.info("messages received [%s]" % len(messages))
for message in messages:
mesg = json.loads(message.get_body())
local, remote = loader.fetch_object(mesg)
write_data(local, remote)
os.unlink(local)
message.delete()
except Exception as me:
logger.error(me)
if __name__ == '__main__':
epilog = '''
Example use cases:
The following example relies on IAM roles to authenticate:
`cloudtrail_monitor -r arn:aws:iam::99999999999:role/ct-mon -Q ct-sqs-queue`
The following example expects credentials to be in environment vars:
`cloudtrail_monitor -Q ct-sqs-queue`
'''
argparser = argparse.ArgumentParser(description='A simple AWS CloudTrail/Logstash SQS driven event loader',
epilog=epilog)
argparser.add_argument('--interval', '-i', type=int, help="message polling interval", default=120)
argparser.add_argument('--sqs_queue', '-Q', type=str, help="AWS SQS Queue the events are published to")
argparser.add_argument('--aws_role', '-r', type=str, help="AWS IAM role to assume", default=None)
argparser.add_argument('--redis_list', '-L', type=str, help="redis list to publish events to", default='logstash')
argparser.add_argument('--redis_host', '-H', type=str, help="redis Server hostname (default: 127.0.0.1).",
default='127.0.0.1')
argparser.add_argument('--redis_port', '-p', type=int, help="redis Server port (default: 6379).", default='6379')
argparser.add_argument('--redis_db', '-n', type=int, help="redis Database number.", default=0)
argparser.add_argument('--redis_password', '-a', type=str, help="redis Database number.", default=None)
argparser.add_argument('--doctype_prefix', '-t', type=str, help="logstash document type root", default='cloudtrail')
argparser.add_argument('--debug', action='store_true', default=False)
argparser.add_argument('config', nargs='?', help="JSON-format config file", default=None)
opts = argparser.parse_args()
if opts.debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
logger.info('starting polling operations with SQS queue: %s ...' % opts.sqs_queue)
logger.debug("startup options: %s" % opts.__dict__)
"""
I do not, will not ever willing or knowingly write an app that does
not use environment vars or STS credentials for AWS
"""
loader = CloudTrailS3EventLoader()
try:
redis_client = redis.Redis(host=opts.redis_host, port=opts.redis_port, db=opts.redis_db,
password=opts.redis_password)
except redis.RedisError as e:
logger.error(e)
while True:
main()
time.sleep(opts.interval)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment