Skip to content

Instantly share code, notes, and snippets.

@rsmoorthy
Last active June 17, 2017 04:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rsmoorthy/f3f78966ba3fd4e35292d74bf8bc0109 to your computer and use it in GitHub Desktop.
Save rsmoorthy/f3f78966ba3fd4e35292d74bf8bc0109 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# This python script gets the cloudtrail logs and stores in /var/log/cloudtrail/
# cloudtrail.log. This fetches AND parses the following AWS logs and stores in files.
# a) CloudTrail logs (JSON gzipped)
# b) S3 Access Logs
# c) ELB Access Logs
# It can incrementally get only the new modified access logs (of any of the above) and append to existing files.
import boto3
import os
import sys
import json
import datetime
import pytz
import time
import itertools
import zlib
class CloudTrail:
def __init__(self, *args, **kwargs):
self.sincedb_path = '/var/tmp/cloudtrail_sincedb'
self.logfile = "/var/log/cloudtrail/cloudtrail.log"
self.profile = None
self.datewise = True
dt = datetime.date.today()
self.store = {'Key': None, 'LastModified': 0, 'year': dt.year, 'month': dt.month, 'day': dt.day }
for a in ['prefix', 'bucket', 'access_key_id', 'secret_access_key', 'sincedb_path', 'region', 'profile', 'logfile', 'datewise']:
if a in kwargs:
self.__dict__[a] = kwargs[a]
def save_store(self):
open(self.sincedb_path, "w").write(json.dumps(self.store))
def read_store(self):
if os.path.isfile(self.sincedb_path):
try:
self.store = json.loads(open(self.sincedb_path).read())
except:
pass
def incr_day(self):
dt = datetime.datetime(self.store['year'], self.store['month'], self.store['day'], 0, 0, 0).replace(tzinfo=pytz.utc)
dt += datetime.timedelta(days=1)
return {"year":dt.year, "month":dt.month, "day":dt.day}
def get_prefix(self, year=None, month=None, day=None):
year = year if year else self.store['year']
month = month if month else self.store['month']
day = day if day else self.store['day']
if self.datewise:
prefix = '%s/%s/%4d/%02d/%02d' % (self.prefix, self.region, year, month, day)
else:
prefix = '%s/' % (self.prefix)
return prefix
def connect_s3(self):
if self.profile:
session = boto3.Session(profile_name=self.profile)
elif self.access_key_id and self.secret_access_key:
session = boto3.Session(aws_access_key_id=self.access_key_id, aws_secret_access_key=self.secret_access_key)
self.client = session.client('s3')
return
def get_list_of_objects(self, prefix=None):
prefix = prefix if prefix else self.get_prefix()
paginator = self.client.get_paginator('list_objects')
page_iterator = paginator.paginate(Bucket=self.bucket, Prefix=prefix)
list_objs = [ contents for page in page_iterator if 'Contents' in page for contents in page['Contents']]
list_objs.sort(key=lambda x: x['LastModified'])
print "Read list of objects for %s, count=%d" % (prefix, len(list_objs))
return list_objs
def get_objects(self, list_objs, limit=1):
dt = datetime.datetime.utcfromtimestamp(self.store['LastModified']).replace(tzinfo=pytz.utc)
utczero = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=pytz.utc)
objs_toget = [ o for o in list_objs if o['LastModified'] >= dt and o['Key'] != self.store['Key'] ]
print "Objects to get => %d" % (len(objs_toget))
objs = [ self.client.get_object(Bucket=self.bucket, Key=o['Key']) for o in list(objs_toget)[:limit] ]
print "Got objs => %d" % (len(objs))
def _extract_data(obj):
if obj['ContentType'] == '':
return obj['Body'].read().split("\n")
if 'ContentEncoding' in obj and obj['ContentEncoding'] == 'gzip':
return [ zlib.decompress(obj['Body'].read(), 16+zlib.MAX_WBITS) ]
if obj['ContentType'] == 'text/plain':
return [ obj['Body'].read().strip() ]
return [ obj['Body'].read() ]
body = [ _extract_data(o) for o in objs ]
lines = [ line for o in body for line in o if len(line) ]
outfile = open(self.logfile, "a")
[ outfile.write(line + "\n") for line in lines ]
print "Wrote lines => ", len(lines)
outfile.close()
if len(objs_toget):
lim = len(objs_toget) if len(objs_toget) < limit else limit
self.store['LastModified'] = int((objs_toget[lim-1]['LastModified'] - utczero).total_seconds())
self.store['Key'] = objs_toget[lim-1]['Key']
self.store['year'] = objs_toget[lim-1]['LastModified'].year
self.store['month'] = objs_toget[lim-1]['LastModified'].month
self.store['day'] = objs_toget[lim-1]['LastModified'].day
self.save_store()
return len(objs)
def run(self):
self.read_store()
self.connect_s3()
list_objs = self.get_list_of_objects()
while list_objs:
objs = ct.get_objects(list_objs, 10)
while objs:
objs = ct.get_objects(list_objs, 10)
if not self.datewise:
break
# Try next day
newdt = self.incr_day()
list_objs = self.get_list_of_objects(prefix=self.get_prefix(**newdt))
if __name__ == "__main__":
from sys import argv
# ct = CloudTrail(bucket='bucket', region='us-west-1', prefix='AWSLogs/999999999999/CloudTrail', logfile='ccc', sincedb_path='ccc.1')
# ct = CloudTrail(bucket='bucket', region='us-west-1', prefix='AWSLogs/999999999999/elasticloadbalancing', logfile='ddd', sincedb_path='ddd.1')
# ct = CloudTrail(bucket='bucket', region='us-west-1', prefix='logs', logfile='eee', sincedb_path='eee.1', datewise=False)
ct.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment