Skip to content

Instantly share code, notes, and snippets.

@brandond
Last active November 23, 2016 05:43
Show Gist options
  • Save brandond/d5701fb763cce3b9d871394704b43db5 to your computer and use it in GitHub Desktop.
Save brandond/d5701fb763cce3b9d871394704b43db5 to your computer and use it in GitHub Desktop.
Evident ESP Python SDK sample
# Load API Keys from dotenv file
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
# Raise items per page to max
from esp import settings
settings.settings.per_page = 100
# Add Unix CSV dialect
import csv
class unix_dialect(csv.Dialect):
"""Describe the usual properties of Unix-generated CSV files."""
delimiter = ','
quotechar = '"'
doublequote = True
skipinitialspace = False
lineterminator = '\n'
quoting = csv.QUOTE_ALL
csv.register_dialect("unix", unix_dialect)
# Enable request tracing
import os
if os.environ.get('DEBUG'):
import requests
import logging
# These two lines enable debugging at httplib level (requests->urllib3->http.client)
# You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
# The only thing missing will be the response.body which is not logged.
try:
import http.client as http_client
except ImportError:
# Python 2
import httplib as http_client
http_client.HTTPConnection.debuglevel = 1
# You must initialize logging, otherwise you'll not see debug output.
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
from __future__ import print_function
from collections import OrderedDict
import sys
sys.path.insert(0,sys.path[0]+'/..')
import evident
import base64
import json
import esp
import csv
import sys
# Extract some useful bit of metadata that varies by signature
def get_metadata(alert):
tags = OrderedDict()
if alert.metadata:
if alert.signature.identifier in ['AWS:EC2-002','AWS:EC2-003','AWS:EC2-004','AWS:EC2-005',
'AWS:EC2-006','AWS:EC2-009','AWS:EC2-010','AWS:EC2-011',
'AWS:EC2-012','AWS:EC2-013','AWS:EC2-014','AWS:EC2-015',
'AWS:EC2-016','AWS:EC2-017','AWS:EC2-018','AWS:EC2-019',
'AWS:EC2-020','AWS:EC2-021','AWS:EC2-022','AWS:EC2-023',
'AWS:EC2-024']:
tags['attachedEC2Instances'] = (alert.metadata.data.get('attachedEC2Instances') or
alert.metadata.data.get('details', {}).get('attachedEC2Instances', []))
elif alert.signature.identifier == 'AWS:ELB-005':
tags['certificateName'] = (alert.metadata.data.get('certificateName') or
alert.metadata.data.get('details', {}).get('certificateName', ''))
elif alert.signature.identifier == 'AWS:IAM-015':
tags['user'] = (alert.metadata.data.get('user') or
alert.metadata.data.get('details', {}).get('user', ''))
if alert.tags:
for tag in alert.tags:
tags[tag.key] = tag.value
return json.dumps(tags)
# Get alert message. Some signatures appear to be buggy and don't base64-decode the message value properly
def get_message(alert):
message = get_message_payload(alert)
if isinstance(message, dict):
return base64.b64decode(message['bytes'])
else:
return message
# Get the alert's message from the metadata. Depending on the signature and even how old the alert is,
# the message may be at a different location in the metadata structure.
def get_message_payload(alert):
if not alert.metadata:
return
if 'details' in alert.metadata.data:
if 'message' in alert.metadata.data['details']:
return alert.metadata.data['details']['message']
elif 'condition' in alert.metadata.data['details']:
return alert.metadata.data['details']['condition']
elif 'message' in alert.metadata.data:
return alert.metadata.data['message']
return
# Export unsupressed alerts from the latest report for each account
def export_alerts():
reports = []
alerts = []
# Get latest report for each account
# Doesn't appear to be a way to get the latest, so we just shrink the page size to 1
# and grab that.
for e in esp.ExternalAccount.where():
esp.settings.settings.per_page = 1
reports.extend(esp.Report.where(include='external_account',external_account_id=e.id_))
esp.settings.settings.per_page = 100
# Run through pages of alerts for each report, adding to alert list
for r in reports:
print('Retrieving alerts for Account {0.external_account.account} from Report ID {0.id_} updated at {0.updated_at}'.format(r), file=sys.stderr)
page = esp.Alert.where(report_id=r.id_, status='fail', not_suppressed='true', signature_risk_level=['high'],
include='external_account,region,signature,metadata,tags')
while True:
alerts.extend([a for a in page if not a.ended_at])
if page.current_page_number == page.last_page_number:
break
else:
page = page.next_page()
# Set up CSV export
fieldnames = ['ID', 'Account', 'Region', 'Resource', 'Signature', 'Name', 'Message', 'Metadata']
writer = csv.DictWriter(sys.stdout, fieldnames=fieldnames, extrasaction='ignore', dialect='unix')
writer.writeheader()
alerts.sort(key=lambda a: int(a.id_))
for a in alerts:
writer.writerow({'ID': a.id_,
'Account': a.external_account.name,
'Region': a.region.code,
'Resource': a.resource,
'Signature': a.signature.identifier,
'Name': a.signature.name,
'Message': get_message(a),
'Metadata': get_metadata(a)})
if __name__ == '__main__':
export_alerts()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment