-
-
Save mike-bailey/4bd7ca8e66a80326ebc4fe2591922595 to your computer and use it in GitHub Desktop.
CloudTrail IP Lookup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
# To force a newer boto3 import | |
root = os.environ["LAMBDA_TASK_ROOT"] | |
sys.path.insert(0, root) | |
import boto3 | |
import botocore | |
import requests | |
import json | |
import gzip | |
def ipstack_score(api_key, ip): | |
try: | |
return requests.get("http://api.ipstack.com/{1}?access_key={0}".format(api_key, ip)) | |
except: | |
# Failed GeoIP | |
pass | |
def af_score(ip): | |
from autofocus import AutoFocusAPI | |
from autofocus.factories.tic import ThreatIntelFactory | |
AutoFocusAPI.api_key = os.environ.get('AF_API_KEY') | |
try: | |
af = ThreatIntelFactory().get_tic_summary(ipv4=ip).__dict__ | |
af['whois'] = str(af['whois']) | |
return af | |
except: | |
# Failed Autofocus | |
pass | |
def ipqs_score(api_key, ip): | |
try: | |
return requests.get("https://www.ipqualityscore.com/api/json/ip/{0}/{1}".format(api_key, ip)) | |
except: | |
# Failed IPQS | |
pass | |
def score_risk(i): | |
safe_words = ["List", "Get", "Describe"] | |
score = 50 | |
# This works better for some orgs than others - makes assumptions about business logic | |
try: | |
# Obviously contigent on IPStack being on | |
if i['ipstack']['country_code'] == 'US': | |
score -= 10 | |
else: | |
score += 10 | |
except: | |
print("GeoIP failed") | |
try: | |
# People care less about these events, but they can still be used destructively | |
if any(x in i['eventName'] for x in safe_words): | |
score -= 10 | |
else: | |
score += 20 | |
except: | |
print("event ranking failed") | |
return score | |
def lambda_handler(event, context): | |
object_get_context = event["getObjectContext"] | |
request_route = object_get_context["outputRoute"] | |
request_token = object_get_context["outputToken"] | |
s3_url = object_get_context["inputS3Url"] | |
intel_bucket = os.environ.get('INTEL_BUCKET') | |
s3 = boto3.client('s3') | |
# Get object from S3 - We do NOT need to decompress due to Content-Encoding | |
response = requests.get(s3_url) | |
# Write object back to S3 Object Lambda | |
data = json.loads(response.text) | |
transformed = {'Records': []} | |
# For each event | |
for i in data['Records']: | |
# If the API is enabled | |
if json.loads(os.environ.get('IPSTACK_API_ENABLED').lower()): | |
try: | |
# Attempt to get a cached lookup | |
obj = s3.get_object(Bucket=intel_bucket, Key="{0}_ipstack.json".format(i['sourceIPAddress'])) | |
i['ipstack'] = json.loads(obj['Body'].read()) | |
# If the lookup does not exist | |
except s3.exceptions.NoSuchKey: | |
# Lookup | |
i['ipstack'] = ipstack_score(os.environ.get('IPSTACK_API_KEY'), i['sourceIPAddress']).json() | |
# If the lookup succeeded | |
if i['ipstack'] != None: | |
# Push to cache | |
s3.put_object( | |
Body=json.dumps(i['ipstack']), | |
Key="{0}_ipstack.json".format(i['sourceIPAddress']), | |
Bucket=intel_bucket) | |
if json.loads(os.environ.get('NATIVE_RISK_ENABLED').lower()): | |
i['fraud_score'] = score_risk(i) | |
if json.loads(os.environ.get('AF_API_ENABLED').lower()): | |
try: | |
obj = s3.get_object(Bucket=intel_bucket, Key="{0}_autofocus.json".format(i['sourceIPAddress'])) | |
i['autofocus'] = json.loads(obj['Body'].read()) | |
except s3.exceptions.NoSuchKey: | |
i['autofocus'] = af_score(i['sourceIPAddress']) | |
if i['autofocus'] != None: | |
s3.put_object( | |
Body=json.dumps(i['autofocus']), | |
Key="{0}_autofocus.json".format(i['sourceIPAddress']), | |
Bucket=intel_bucket) | |
if json.loads(os.environ.get('IPQS_API_ENABLED').lower()): | |
try: | |
obj = s3.get_object(Bucket=intel_bucket, Key="{0}_ipqualityscore.json".format(i['sourceIPAddress'])) | |
i['ipqualityscore'] = json.loads(obj['Body'].read()) | |
except s3.exceptions.NoSuchKey: | |
i['ipqualityscore'] = ipqs_score(os.environ.get('IPQS_API_KEY'),i['sourceIPAddress']).json() | |
if i['ipqualityscore'] != None: | |
s3.put_object( | |
Body=json.dumps(i['ipqualityscore']), | |
Key="{0}_ipqualityscore.json".format(i['sourceIPAddress']), | |
Bucket=intel_bucket) | |
transformed['Records'] += [i] | |
# Compress the return | |
transformed_object=gzip.compress(bytes(json.dumps(transformed), 'utf-8')) | |
s3.write_get_object_response( | |
Body=transformed_object, | |
RequestRoute=request_route, | |
RequestToken=request_token) | |
return {'status_code': 200} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment