Skip to content

Instantly share code, notes, and snippets.

@mike-bailey
Last active April 8, 2021 16:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mike-bailey/4bd7ca8e66a80326ebc4fe2591922595 to your computer and use it in GitHub Desktop.
Save mike-bailey/4bd7ca8e66a80326ebc4fe2591922595 to your computer and use it in GitHub Desktop.
CloudTrail IP Lookup
import sys
import os
# To force a newer boto3 import
root = os.environ["LAMBDA_TASK_ROOT"]
sys.path.insert(0, root)
import boto3
import botocore
import requests
import json
import gzip
def ipstack_score(api_key, ip):
try:
return requests.get("http://api.ipstack.com/{1}?access_key={0}".format(api_key, ip))
except:
# Failed GeoIP
pass
def af_score(ip):
from autofocus import AutoFocusAPI
from autofocus.factories.tic import ThreatIntelFactory
AutoFocusAPI.api_key = os.environ.get('AF_API_KEY')
try:
af = ThreatIntelFactory().get_tic_summary(ipv4=ip).__dict__
af['whois'] = str(af['whois'])
return af
except:
# Failed Autofocus
pass
def ipqs_score(api_key, ip):
try:
return requests.get("https://www.ipqualityscore.com/api/json/ip/{0}/{1}".format(api_key, ip))
except:
# Failed IPQS
pass
def score_risk(i):
safe_words = ["List", "Get", "Describe"]
score = 50
# This works better for some orgs than others - makes assumptions about business logic
try:
# Obviously contigent on IPStack being on
if i['ipstack']['country_code'] == 'US':
score -= 10
else:
score += 10
except:
print("GeoIP failed")
try:
# People care less about these events, but they can still be used destructively
if any(x in i['eventName'] for x in safe_words):
score -= 10
else:
score += 20
except:
print("event ranking failed")
return score
def lambda_handler(event, context):
object_get_context = event["getObjectContext"]
request_route = object_get_context["outputRoute"]
request_token = object_get_context["outputToken"]
s3_url = object_get_context["inputS3Url"]
intel_bucket = os.environ.get('INTEL_BUCKET')
s3 = boto3.client('s3')
# Get object from S3 - We do NOT need to decompress due to Content-Encoding
response = requests.get(s3_url)
# Write object back to S3 Object Lambda
data = json.loads(response.text)
transformed = {'Records': []}
# For each event
for i in data['Records']:
# If the API is enabled
if json.loads(os.environ.get('IPSTACK_API_ENABLED').lower()):
try:
# Attempt to get a cached lookup
obj = s3.get_object(Bucket=intel_bucket, Key="{0}_ipstack.json".format(i['sourceIPAddress']))
i['ipstack'] = json.loads(obj['Body'].read())
# If the lookup does not exist
except s3.exceptions.NoSuchKey:
# Lookup
i['ipstack'] = ipstack_score(os.environ.get('IPSTACK_API_KEY'), i['sourceIPAddress']).json()
# If the lookup succeeded
if i['ipstack'] != None:
# Push to cache
s3.put_object(
Body=json.dumps(i['ipstack']),
Key="{0}_ipstack.json".format(i['sourceIPAddress']),
Bucket=intel_bucket)
if json.loads(os.environ.get('NATIVE_RISK_ENABLED').lower()):
i['fraud_score'] = score_risk(i)
if json.loads(os.environ.get('AF_API_ENABLED').lower()):
try:
obj = s3.get_object(Bucket=intel_bucket, Key="{0}_autofocus.json".format(i['sourceIPAddress']))
i['autofocus'] = json.loads(obj['Body'].read())
except s3.exceptions.NoSuchKey:
i['autofocus'] = af_score(i['sourceIPAddress'])
if i['autofocus'] != None:
s3.put_object(
Body=json.dumps(i['autofocus']),
Key="{0}_autofocus.json".format(i['sourceIPAddress']),
Bucket=intel_bucket)
if json.loads(os.environ.get('IPQS_API_ENABLED').lower()):
try:
obj = s3.get_object(Bucket=intel_bucket, Key="{0}_ipqualityscore.json".format(i['sourceIPAddress']))
i['ipqualityscore'] = json.loads(obj['Body'].read())
except s3.exceptions.NoSuchKey:
i['ipqualityscore'] = ipqs_score(os.environ.get('IPQS_API_KEY'),i['sourceIPAddress']).json()
if i['ipqualityscore'] != None:
s3.put_object(
Body=json.dumps(i['ipqualityscore']),
Key="{0}_ipqualityscore.json".format(i['sourceIPAddress']),
Bucket=intel_bucket)
transformed['Records'] += [i]
# Compress the return
transformed_object=gzip.compress(bytes(json.dumps(transformed), 'utf-8'))
s3.write_get_object_response(
Body=transformed_object,
RequestRoute=request_route,
RequestToken=request_token)
return {'status_code': 200}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment