Skip to content

Instantly share code, notes, and snippets.

@bigsnarfdude
Last active September 30, 2018 07:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save bigsnarfdude/cb9bc2f3f99639f9bef7 to your computer and use it in GitHub Desktop.
Save bigsnarfdude/cb9bc2f3f99639f9bef7 to your computer and use it in GitHub Desktop.
cloudtrail event analysis local hdd python3 async await
#! /usr/local/env python
# coding: utf-8
import gzip
import json
from pprint import pprint
import pandas as pd
from pandas.io.json import json_normalize
import sys
import socket
import boto
import os
import ipaddress
import asyncio
import glob
hdd_files = glob.glob("/Users/bigsnarfdude/cloudtrail_logs/*.json.gz")
security_events = ['CreateKeyPair', 'CheckMfa']
class CloudtrailAnalysis():
@staticmethod
def check_value(df_data, value):
if df_data[df_data['eventName'] == value].empty:
pass
else:
frame = df_data[df_data['eventName'] == value]
return value, frame['eventTime'].values[0], frame['userIdentity.userName'].values[0], frame['awsRegion'].values[0]
async def get_file_analyse_local_events(f, event):
#print("+++ Found new log: ", f)
with gzip.open(f, "rb") as f:
d = json.loads(f.read().decode("ascii"))
records = d["Records"]
df_data = json_normalize(records)
if CloudtrailAnalysis.check_value(df_data, event) == None:
pass
else:
print(CloudtrailAnalysis.check_value(df_data, event))
async def main(f, event):
await get_file_analyse_local_events(f, event)
# process async
loop = asyncio.get_event_loop()
for f in hdd_files:
for event in security_events:
loop.run_until_complete(main(f, event))
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment