Skip to content

Instantly share code, notes, and snippets.

@magnetikonline
Last active May 1, 2023 04:43
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save magnetikonline/2a82f7f8b9e5e19c53e17cfccd1298f7 to your computer and use it in GitHub Desktop.
Save magnetikonline/2a82f7f8b9e5e19c53e17cfccd1298f7 to your computer and use it in GitHub Desktop.
Python AWS CloudTrail parser class.

Python AWS CloudTrail parser

Python parser class for CloudTrail event archives, previously dumped to an S3 bucket. Class provides an iterator which will:

  • Scan a given directory for archive files matching the required pattern.
  • Decompress each archive in memory.
  • Parse JSON payload and return each event in turn.

Parser contained in cloudtrailparser.py, with timezone.py used as a simple datetime.tzinfo concrete class implement to provide UTC timezone.

Example

$ ls -l1 /path/to/cloudtrail/archives
ACCOUNT_IDXX_CloudTrail_ap-southeast-2_20160101T2155Z_uiGgE0mgD8GUpvNi.json.gz
ACCOUNT_IDXX_CloudTrail_ap-southeast-2_20160101T2305Z_BNBEUH14QUAV0dNd.json.gz

$ ./example.py

Event name: ListContainerInstances
Event time: 2016-01-01 23:02:08+00:00

Event name: DescribeContainerInstances
Event time: 2016-01-01 23:02:08+00:00

Event name: ListContainerInstances
Event time: 2016-01-01 23:02:11+00:00

Event name: DiscoverPollEndpoint
Event time: 2016-01-01 22:59:36+00:00

Event name: DescribeInstanceHealth
Event time: 2016-01-01 23:00:41+00:00
from datetime import datetime
import gzip
import json
import os
import re
import timezone
class Parser:
ARCHIVE_FILENAME_REGEXP = re.compile(
r"^[0-9]{12}_CloudTrail_[a-z]{2}-[a-z]+-[0-9]_[0-9]{8}T[0-9]{4}Z_[a-zA-Z0-9]{16}\.json\.gz$"
)
CLOUDTRAIL_EVENT_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
TIMEZONE_UTC = timezone.UTC()
def __init__(self, archive_base_dir):
# store base dir to CloudTrail archives
self.archive_base_dir = archive_base_dir.rstrip("/")
def events(self):
# work over CloudTrail archive files
for archive_file_item in self.archive_file_list():
# open archive - parse JSON contents to dictionary
fp = gzip.open(archive_file_item, "rb")
cloudtrail_data = json.loads(fp.read())
fp.close()
if "Records" in cloudtrail_data:
for trail_item in cloudtrail_data["Records"]:
yield self.build_trail_data(trail_item)
def archive_file_list(self):
for base_path, dir_list, file_list in os.walk(self.archive_base_dir):
# work over files in directory
for file_item in file_list:
# does file item match archive pattern?
if not Parser.ARCHIVE_FILENAME_REGEXP.search(file_item):
# nope - skip file
continue
# full path to archive file
yield "{0}/{1}".format(base_path, file_item)
def build_trail_data(self, source):
# convert time string to datetime at UTC
event_time_utc = datetime.strptime(
source["eventTime"], Parser.CLOUDTRAIL_EVENT_DATETIME_FORMAT
).replace(tzinfo=Parser.TIMEZONE_UTC)
# extract the data we care about from the CloudTrail item into dict
return {
"account_id": str(source["recipientAccountId"]),
"region": str(source["awsRegion"]),
"event_name": str(source["eventName"]),
"event_time": event_time_utc,
"request": self.strip_data_unicode(source["requestParameters"]),
"response": self.strip_data_unicode(source["responseElements"]),
}
def strip_data_unicode(self, data):
data_type = type(data)
# recursively process via strip_data_unicode() both list and dictionary structures
if data_type is list:
return [self.strip_data_unicode(list_item) for list_item in data]
if data_type is dict:
return {
self.strip_data_unicode(dict_key): self.strip_data_unicode(dict_value)
for (dict_key, dict_value) in data.items()
}
# simple value
if data_type is unicode:
# if unicode cast to string
data = str(data)
return data
#!/usr/bin/env python3
import cloudtrailparser
def main():
print("Example")
parser = cloudtrailparser.Parser("/path/to/cloudtrail/archives")
for event in parser.events():
print("Event name: {0}".format(event["event_name"]))
print("Event time: {0}\n".format(event["event_time"]))
if __name__ == "__main__":
main()
import datetime
class BaseTimezone(datetime.tzinfo):
TIMEDELTA_ZERO = datetime.timedelta(0)
def __init__(self, timezone_name, offset_seconds):
datetime.tzinfo.__init__(self)
self.timezone_name = timezone_name
self.offset = datetime.timedelta(seconds=offset_seconds)
def utcoffset(self, dt):
return self.offset
def dst(self, dt):
return BaseTimezone.TIMEDELTA_ZERO
def tzname(self, dt):
return self.timezone_name
# define timezones
class UTC(BaseTimezone):
def __init__(self):
BaseTimezone.__init__(self, "UTC", 0)
class Melbourne(BaseTimezone):
def __init__(self):
BaseTimezone.__init__(self, "Melbourne", 10 * 3600)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment