Skip to content

Instantly share code, notes, and snippets.

@turtlemonvh
Last active February 23, 2018 16:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save turtlemonvh/8a6ee4cf80a574dd6f2de5190fb84e00 to your computer and use it in GitHub Desktop.
Save turtlemonvh/8a6ee4cf80a574dd6f2de5190fb84e00 to your computer and use it in GitHub Desktop.

CloudTrail log search

Download logs from s3 and search through them. Caches downloaded files at _search_downloads/ for better performance. Outputs json. Use jq for further processing and filtering. (example: https://gist.github.com/pcn/f98c7852b0558b847784)

#!/usr/bin/env python
import argparse
import gzip
import json
import os
import logging
import boto3
logging.basicConfig()
logger = logging.getLogger()
logging.getLogger('botocore.vendored.requests.packages.urllib3.connectionpool').setLevel(logging.WARNING)
CACHE_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "_search_downloads")
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
if __name__ == "__main__":
p = argparse.ArgumentParser(description="Search through cloudtrail logs.")
p.add_argument('queries', type=str, nargs="+", action='store', help='Terms to search for in each line.')
p.add_argument('--debug', dest="debug", action='store_true', help='Turn on more verbose logging.')
p.add_argument('--bucket', type=str, dest="bucket", required=True, action='store', help='S3 bucket to search.')
# e.g.: AWSLogs/123456789012/CloudTrail/us-east-1/2018/02/23/123456789012_CloudTrail_us-east-1_20180223T1420Z_U9c6Hz1IT9H9eQIu.json.gz
p.add_argument('--start', type=str, dest="start_mark", required=True, action='store', help='Offset to start at.')
p.add_argument('--end', type=str, dest="end_mark", required=True, action='store', help='Offset to end at.')
p.add_argument('--prefix', type=str, dest="prefix", default="", action='store', help='Prefix to use for start and end conditions.')
options = p.parse_args()
if options.debug:
logger.setLevel(logging.INFO)
# http://boto3.readthedocs.io/en/latest/reference/services/s3.html#bucket
s3 = boto3.resource('s3')
bucket = s3.Bucket(options.bucket)
files_to_search = []
start_full = os.path.join(options.prefix, options.start_mark)
end_full = os.path.join(options.prefix, options.end_mark)
for o in bucket.objects.filter(Prefix=options.prefix, Marker=start_full):
# http://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.ObjectSummary
if o.key > end_full:
break
download_path = os.path.join(CACHE_DIR, o.key)
download_dir = os.path.dirname(download_path)
if not os.path.exists(download_dir):
os.makedirs(download_dir)
files_to_search.append(download_path)
if not os.path.exists(download_path):
logger.info("Downloading %s to %s" % (o.key, download_path))
else:
logger.info("Not downloading %s (already exists at %s)" % (o.key, download_path))
r = o.get()
with open(download_path, "w+") as f:
f.write(r['Body'].read())
# Search through files
for f in files_to_search:
with gzip.open(f) as logfile:
for line in logfile:
o = json.loads(line)
# Assume this format
for r in o['Records']:
subline = json.dumps(r)
for query in options.queries:
if query not in subline:
break
else:
print(subline)
@turtlemonvh
Copy link
Author

If you have a lot of logs, something like this is probably better:
https://docs.aws.amazon.com/athena/latest/ug/cloudtrail-logs.html

@turtlemonvh
Copy link
Author

turtlemonvh commented Feb 23, 2018

Example usage

python search_cloudtrail.py ssm kms --prefix AWSLogs/123456789012/CloudTrail/us-east-1/2018/02/23 --start 123456789012_CloudTrail_us-east-1_20180223T1420Z_U9c6Hz1IT9H9eQIu.json.gz --end 123456789012_CloudTrail_us-east-1_20180223T1445Z_gzgTm1eTE7Dbk7JU.json.gz --bucket "mybucket"

Searches a subset of logs to find any records where both "ssm" and "kms" are in the json representation of the record and writes those to stdout.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment