Skip to content

Instantly share code, notes, and snippets.

@nvisium-jonn-callahan
Created February 13, 2018 04:32
Show Gist options
  • Save nvisium-jonn-callahan/e89db90d459ea3e188eb942552f2ec27 to your computer and use it in GitHub Desktop.
Save nvisium-jonn-callahan/e89db90d459ea3e188eb942552f2ec27 to your computer and use it in GitHub Desktop.
import sys
import rsa
import gzip
import json
import boto3
import queue
import logging
import hashlib
import binascii
from logging import handlers
from concurrent import futures
from datetime import datetime as dt, timedelta as td
# number of previous log digests to verify. they are generated once per hour, so this functionally
# equates to how many hours back the logfiles should be verified.
NUM_VERIFY = 36
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def start_verification(account_id, digest_bucket, region):
# boto3 is not thread-safe by default. we create a session per thread to enable this.
session = boto3.session.Session()
digest_obj = get_latest_digest_obj(account_id, digest_bucket, region, session)
for i in range(1, NUM_VERIFY+1):
if digest_obj is None:
logger.warning('Log digest chain broken - only processed %s digest files' % i)
break
digest_obj, digest_bucket = digest_verification(digest_obj, digest_bucket, region, session)
def get_latest_digest_obj(account_id, digest_bucket, region, session):
s3 = session.client('s3')
current_time = dt.utcnow()
# quick and dirty check for the latest digest file via a prefix. good chance it won't return anything
# for regions that are rarely/never used towards the beginning of a month.
prefix = 'AWSLogs/%s/CloudTrail-Digest/%s/%s/%02d/' % (
account_id,
region,
current_time.year,
current_time.month
)
resp = s3.list_objects_v2(Bucket=digest_bucket, Prefix=prefix)
obj_list = sorted(resp.get('Contents', []), key=lambda k: k['LastModified'], reverse=True)
if len(obj_list) == 0:
logger.error('Unable to find the most recent digest file for the %s region within bucket: %s' % (region, digest_bucket))
return None
return obj_list[0]['Key']
def digest_verification(digest_obj, cloudtrail_bucket, region, session):
# cloudtrail calls must be region-aware
s3 = session.client('s3')
cloudtrail = session.client('cloudtrail', region_name=region)
logger.info('Fetching digest file: %s' % digest_obj)
resp = s3.get_object(Bucket=cloudtrail_bucket, Key=digest_obj)
signature = binascii.a2b_hex(resp['Metadata']['signature'])
algo = resp['Metadata']['signature-algorithm']
content = gzip.decompress(resp['Body'].read())
dict_content = json.loads(content)
fingerprint = dict_content['digestPublicKeyFingerprint']
log_file_list = dict_content['logFiles']
start_time = dict_content['digestStartTime']
end_time = dict_content['digestEndTime']
previous_digest_sig = dict_content['previousDigestSignature']
previous_digest_obj = dict_content['previousDigestS3Object']
previous_digest_bucket = dict_content['previousDigestS3Bucket']
# this val must be the string 'null' if it `is None`. this is "to match the java impl"
# https://github.com/aws/aws-cli/blob/e2295b022db35eea9fec7e6c5540d06dbd6e588b/awscli/customizations/cloudtrail/validation.py#L552
previous_digest_sig = 'null' if previous_digest_sig is None else previous_digest_sig
resp = cloudtrail.list_public_keys(StartTime=start_time, EndTime=end_time)
key_list = resp['PublicKeyList']
public_key_content = [key['Value'] for key in key_list if key['Fingerprint'] == fingerprint]
if len(public_key_content) != 1:
logger.critical('Unable to find matching key for log file digest: %s' % digest_obj)
sys.exit(1)
else:
public_key_content = public_key_content[0]
public_key = rsa.PublicKey.load_pkcs1(public_key_content, format='DER')
h = hashlib.sha256()
h.update(content)
hashed_content = binascii.hexlify(h.digest()).decode()
signing_string = ('%s\n%s\n%s\n%s' % (
end_time,
'%s/%s' % (cloudtrail_bucket, digest_obj),
hashed_content,
previous_digest_sig
)).encode()
try:
rsa.verify(signing_string, signature, public_key)
logger.info('Digest file in %s region successfully verified: %s' % (region, digest_obj))
except rsa.pkcs1.VerificationError:
logger.critical('Signature validation failed in %s region for digest file: %s' % (region, digest_obj))
sys.exit(2) #no point in verifying cloudtrail logs if the digest cannot be verified
for log_file in log_file_list:
resp = s3.get_object(Bucket=log_file['s3Bucket'], Key=log_file['s3Object'])
content = gzip.decompress(resp['Body'].read())
h = hashlib.sha256()
h.update(content)
generated_hash = binascii.hexlify(h.digest()).decode()
if generated_hash != log_file['hashValue']:
logger.critical('Mismatched content hash found for cloudtrail log file: %s' % log_file['s3Object'])
else:
logger.info('CloudTrail log file successfully verified: %s' % log_file['s3Object'])
return previous_digest_obj, previous_digest_bucket
# this can be modified to however you want to push out notifications of failures
# e.g. sns topic, slack hook, new relic alarm
def notify_error(log_record):
sns = boto3.client('sns')
if log_record.levelno > logging.INFO:
sns.publish(TopicArn='arn:aws:sns:us-east-1:014890146463:test-topic', Message=log_record.getMessage())
def lambda_handler(event, context):
logging_queue = queue.Queue()
handler = handlers.QueueHandler(logging_queue)
logger.addHandler(handler)
digest_bucket = 'local-cloudtrail-logs-014890146463'
account_id = context.invoked_function_arn.split(":")[4]
# currently, the only way to programmatiaclly query this value is via the ec2 or other service-specific apis
ec2 = boto3.client('ec2')
resp = ec2.describe_regions()
region_list = [r['RegionName'] for r in resp['Regions']]
pool = futures.ThreadPoolExecutor(max_workers=len(region_list))
for region in region_list:
pool.submit(start_verification, account_id, digest_bucket, region)
pool.shutdown(wait=True)
while not logging_queue.empty():
try:
log_record = logging_queue.get(timeout=0.1)
notify_error(log_record)
except queue.Empty:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment