Skip to content

Instantly share code, notes, and snippets.

@patrobinson
Created February 20, 2018 02:44
Show Gist options
  • Save patrobinson/864e46e989a2df53eed70dccae560302 to your computer and use it in GitHub Desktop.
Save patrobinson/864e46e989a2df53eed70dccae560302 to your computer and use it in GitHub Desktop.
(SO0037) - Real Time Insights on AWS Account Activity
#!/usr/bin/python
# -*- coding: utf-8 -*-
##############################################################################
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. #
# #
# Licensed under the Amazon Software License (the 'License'). You may not #
# use this file except in compliance with the License. A copy of the #
# License is located at #
# #
# http://aws.amazon.com/asl/ #
# #
# or in the 'license' file accompanying this file. This file is distributed #
# on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, #
# express or implied. See the License for the specific language governing #
# permissions and limitations under the License. #
##############################################################################
from __future__ import print_function
from itertools import groupby
import boto3
import botocore
import base64
import os
import logging
import urllib2
from json import loads,dumps
from collections import OrderedDict
from operator import itemgetter
from random import randint
from sys import maxint
from time import sleep
log_level = str(os.environ.get('LOG_LEVEL')).upper()
if log_level not in ['DEBUG', 'INFO','WARNING', 'ERROR','CRITICAL']:
log_level = 'ERROR'
log = logging.getLogger()
log.setLevel(log_level)
send_anonymous_data = str(os.environ.get('SEND_ANONYMOUS_DATA')).upper()
ip_table_name = "cloudtrail-log-ip-metrics"
table_name="cloudtrail-log-analytics-metrics"
calls_per_ip="CallsPerUniqueIp"
successful_calls = "NumberOfSuccessfulCalls"
anomaly_score = "AnomalyScore"
max_retry_attempts = 5
client = boto3.client('dynamodb')
def update_dynamodb(record_data):
ddb_record = client.get_item(TableName=table_name,
Key={'MetricType': {'S':metric_type},
'EventTime':{'S':event_time} },
ConsistentRead=True)
ddb_data = loads(ddb_record['Item']['Data']['S'])
concurrency_token = int(ddb_record['Item']['ConcurrencyToken']['N'])
merged_data = { k : record_data.get(k,0) + ddb_data.get(k,0) for k in set(record_data) | set(ddb_data) }
record_data = OrderedDict(sorted(merged_data.iteritems(), key=itemgetter(1), reverse=True))
put_record(metric_type, event_time, record_data, concurrency_token)
def put_record_with_retry(metric_type, event_time, record_data, merged_data, concurrency_token, attempt=0):
log.info("Retry: {0} {1} {2}".format(metric_type, event_time, str(attempt)))
if attempt > max_retry_attempts: return
try:
put_record(metric_type, event_time, merged_data, concurrency_token)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
sleep(randint(0,5))
ddb_record = client.get_item(TableName=table_name,
Key={'MetricType': {'S':metric_type},
'EventTime':{'S':event_time} },
ConsistentRead=True)
merged_data = merge_record_with_ddb(record_data, ddb_record)
put_record_with_retry(metric_type, event_time, record_data, merged_data, concurrency_token, attempt+1)
else: raise
def put_record(metric_type, event_time, data, concurrency_token=None):
item = {'MetricType': {'S':metric_type},
'EventTime':{'S':event_time},
'Data':{'S':dumps(data)},
'ConcurrencyToken':{'N':str(randint(0,maxint))}}
if concurrency_token:
client.put_item(TableName=table_name, Item=item,
ConditionExpression='ConcurrencyToken = :concurrency_token',
ExpressionAttributeValues={':concurrency_token':{'N':str(concurrency_token)}})
else:
client.put_item(TableName=table_name, Item=item)
def merge_record_with_ddb(record_data, ddb_record):
ddb_data = loads(ddb_record['Item']['Data']['S'])
concurrency_token = int(ddb_record['Item']['ConcurrencyToken']['N'])
merged_data = { k : record_data.get(k,0) + ddb_data.get(k,0) for k in set(record_data) | set(ddb_data) }
merged_data = OrderedDict(sorted(merged_data.iteritems(), key=itemgetter(1), reverse=True))
return merged_data
def merge_record_values(metric_key, grouped_rows):
if 'AnomalyScore' in metric_key:
return sum(float(key[5]) for key in grouped_rows)
else:
return sum(int(key[5]) for key in grouped_rows)
#This function sends anonymous usage data, if enabled
def sendAnonymousData(event_time,dataDict):
log.debug("Sending Anonymous Data")
postDict = {}
postDict['Data'] = dataDict
postDict['TimeStamp'] = event_time
postDict['Solution'] = 'SO0037'
postDict['UUID'] = os.environ.get('UUID')
# API Gateway URL to make HTTP POST call
url = 'https://metrics.awssolutionsbuilder.com/generic'
data=dumps(postDict)
log.debug(data)
headers = {'content-type': 'application/json'}
req = urllib2.Request(url, data, headers)
rsp = urllib2.urlopen(req)
rspcode = rsp.getcode()
content = rsp.read()
log.debug("Response from APIGateway: %s, %s", rspcode, content)
def lambda_handler(event, context):
payload = event['Records']
output = {}
data = [base64.b64decode(record['kinesis']['data']).strip().split(',') for record in payload]
data = filter(lambda x: x[2]!="null", data)
log.info(data)
for metric_key,metric_group in groupby(data, key=lambda x:"{0}|{1}".format(x[0],x[1])):
grouped_metric = list(metric_group)
for category_key,grouped_rows in groupby(grouped_metric, key=lambda x: "{0}|{1}".format(x[2],x[3])):
output.setdefault(metric_key, {})[category_key] = merge_record_values(metric_key, list(grouped_rows))
for record_key in output:
event_time,metric_type = record_key.split('|')
record_data = OrderedDict(sorted(output[record_key].iteritems(), key=itemgetter(1), reverse=True))
ddb_record = client.get_item(TableName=table_name,
Key={'MetricType': {'S':metric_type},
'EventTime':{'S':event_time} },
ConsistentRead=True)
if 'Item' not in ddb_record:
put_record(metric_type,event_time, record_data)
else:
merged_data = merge_record_with_ddb(record_data, ddb_record)
put_record_with_retry(metric_type, event_time, record_data, merged_data, int(ddb_record['Item']['ConcurrencyToken']['N']))
if metric_type == calls_per_ip:
max_ip = next(iter(record_data))
max_ip_count = record_data[max_ip]
max_ip = max_ip.split('|')[0]
hour,minute,_ = event_time.split(':')
ddb_max_ip = client.get_item(TableName=ip_table_name,
Key={'Hour': {'S': hour},
'Minute':{'S':minute} },
ConsistentRead=True)
if 'Item' not in ddb_max_ip or max_ip_count > int(ddb_max_ip['Item']['MaxCount']['N']):
client.put_item(TableName=ip_table_name,
Item={'Hour': {'S':hour},
'Minute':{'S':minute},
'IP':{'S':max_ip},
'MaxCount':{'N': str(max_ip_count)}} )
if send_anonymous_data == "YES":
try:
unique_keys = list(set(output))
for record_key in unique_keys:
event_time,metric_type = record_key.split('|')
if metric_type == successful_calls or metric_type == anomaly_score:
ddb_record = client.get_item(TableName=table_name,
Key={'MetricType': {'S':metric_type},
'EventTime':{'S':event_time} },
ConsistentRead=True)
del ddb_record["Item"]["ConcurrencyToken"]
del ddb_record["Item"]["EventTime"]
metric_data= {}
metric_data['MetricType'] = ddb_record['Item']['MetricType']['S']
if metric_type == successful_calls:
services, num_calls = ddb_record['Item']['Data']['S'].split(',')[0].split(':')
metric_data['NumberOfSuccessfulCalls'] = num_calls.replace('}','').replace(' ', '')
if metric_type == anomaly_score:
num_calls,anomaly_data = ddb_record['Item']['Data']['S'].split(',')[0].split(':')
metric_data['NumberOfSuccessfulCalls'] = num_calls.replace('{', '').replace('"', '').split('|')[0]
metric_data['AnamonlyScore'] = anomaly_data.replace('}', '')
sendAnonymousData(event_time,metric_data)
except Exception as error:
log.error(error)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment