Last active
February 26, 2024 20:57
-
-
Save jonrau1/2710c54a20684157b28cdeb95364fb64 to your computer and use it in GitHub Desktop.
Lambda function (Py 3.8) to convert VPC flow logs from Cloudwatch Logs into JSON. Performs enrichment by adding ENI and EC2 information, reverse DNS and geolocation courtesy of ip-api.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import time | |
import os | |
import json | |
import boto3 | |
import ipaddress | |
import socket | |
import base64 | |
import zlib | |
import datetime | |
def lambda_handler(event, context): | |
# import clients | |
ec2 = boto3.client('ec2') | |
# import clients | |
ec2 = boto3.client('ec2') | |
# create AWS region from Lambda env var | |
awsRegion = os.environ['AWS_REGION'] | |
# grab the 'awslogs' part of the CWL event stream | |
data = event.get('awslogs', {}).get('data') | |
# Base64 Decode, decompress with ZLIB, load into a dict with json.loads | |
records = json.loads(zlib.decompress(base64.b64decode(data), 16 + zlib.MAX_WBITS)) | |
# Loop through the flow log message | |
for flows in records['logEvents']: | |
# split the string to grab values, these are number coded above the modules in the comments | |
flowLogRaw = flows['message'].split() | |
acctId = flowLogRaw[1] | |
interfaceId = flowLogRaw[2] | |
sourceIp = flowLogRaw[3] | |
destIp = flowLogRaw[4] | |
sourcePort = flowLogRaw[5] | |
destPort = flowLogRaw[6] | |
ianaProtocol = flowLogRaw[7] | |
startTimeUnix = int(flowLogRaw[10]) | |
endTimeUnix = int(flowLogRaw[11]) | |
flowAction = flowLogRaw[12] | |
# convert the flow log Unix Seconds into UTC aware datetime | |
dtStartTime = datetime.datetime.fromtimestamp(startTimeUnix, datetime.timezone.utc) | |
dtEndTime = datetime.datetime.fromtimestamp(endTimeUnix, datetime.timezone.utc) | |
# create an ARN for the ENIs for cross-reference | |
eniArn = 'arn:aws:ec2:' + awsRegion + ':' + acctId + ':network-interface/' + interfaceId | |
def source_ip_checker(): | |
rfc1918Check = ipaddress.IPv4Address(sourceIp) | |
if rfc1918Check.is_private: | |
countryCode = str('Unidentified') | |
latitude = str('Unidentified') | |
longitude = str('Unidentified') | |
isp = str('Unidentified') | |
org = str('Unidentified') | |
asn = str('Unidentified') | |
asnName = str('Unidentified') | |
geoLoc = { 'srcLat': latitude, 'srcLon': longitude, 'srcCountryCode': countryCode, 'srcIsp': isp, 'srcOrg': org, 'srcAs': asn, 'srcAsname': asnName } | |
geoData = json.dumps(geoLoc) | |
return geoData | |
else: | |
url = 'http://ip-api.com/json/' + sourceIp + '?fields=status,message,countryCode,lat,lon,isp,org,as,asname' | |
r = requests.get(url) | |
# handle throttling | |
reqRemain = int(r.headers['X-Rl']) | |
if reqRemain == 0: | |
ttl = int(r.headers['X-Ttl']) | |
waitTime = ttl + 1 | |
time.sleep(waitTime) | |
else: | |
ipJson = r.json() | |
countryCode = str(ipJson['countryCode']) | |
latitude = str(ipJson['lat']) | |
longitude = str(ipJson['lon']) | |
isp = str(ipJson['isp']) | |
org = str(ipJson['org']) | |
asn = str(ipJson['as']) | |
asnName = str(ipJson['asname']) | |
geoLoc = { 'srcLat': latitude, 'srcLon': longitude, 'srcCountryCode': countryCode, 'srcIsp': isp, 'srcOrg': org, 'srcAs': asn, 'srcAsname': asnName } | |
geoData = json.dumps(geoLoc) | |
return geoData | |
def dest_ip_checker(): | |
rfc1918Check = ipaddress.IPv4Address(destIp) | |
if rfc1918Check.is_private: | |
countryCode = str('Unidentified') | |
latitude = str('Unidentified') | |
longitude = str('Unidentified') | |
isp = str('Unidentified') | |
org = str('Unidentified') | |
asn = str('Unidentified') | |
asnName = str('Unidentified') | |
geoLoc = { 'destLat': latitude, 'destLon': longitude, 'destCountryCode': countryCode, 'destIsp': isp, 'destOrg': org, 'destAs': asn, 'destAsname': asnName } | |
geoData = json.dumps(geoLoc) | |
return geoData | |
else: | |
url = 'http://ip-api.com/json/' + destIp + '?fields=status,message,countryCode,lat,lon,isp,org,as,asname' | |
r = requests.get(url) | |
# handle throttling | |
reqRemain = int(r.headers['X-Rl']) | |
if reqRemain == 0: | |
ttl = int(r.headers['X-Ttl']) | |
waitTime = ttl + 1 | |
time.sleep(waitTime) | |
else: | |
ipJson = r.json() | |
countryCode = str(ipJson['countryCode']) | |
latitude = str(ipJson['lat']) | |
longitude = str(ipJson['lon']) | |
isp = str(ipJson['isp']) | |
org = str(ipJson['org']) | |
asn = str(ipJson['as']) | |
asnName = str(ipJson['asname']) | |
geoLoc = { 'destLat': latitude, 'destLon': longitude, 'destCountryCode': countryCode, 'destIsp': isp, 'destOrg': org, 'destAs': asn, 'destAsname': asnName } | |
geoData = json.dumps(geoLoc) | |
return geoData | |
def source_instance_or_dns_finder(): | |
rfc1918Check = ipaddress.IPv4Address(sourceIp) | |
if rfc1918Check.is_private: | |
try: | |
response = ec2.describe_instances(Filters=[{'Name':'private-ip-address','Values': [sourceIp]}]) | |
if str(response['Reservations']) == '[]': | |
sourceReverseDomain = 'Unidentified' | |
sourceInstanceId = 'None' | |
sourceInstanceArn = 'None' | |
ec2Intel = { 'sourceReverseDomain': sourceReverseDomain, 'sourceInstanceId': sourceInstanceId, 'sourceInstanceArn': sourceInstanceArn } | |
sourceEc2Data = json.dumps(ec2Intel) | |
return sourceEc2Data | |
else: | |
sourceReverseDomain = str(response['Reservations'][0]['Instances'][0]['PrivateDnsName']) | |
sourceInstanceId = str(response['Reservations'][0]['Instances'][0]['InstanceId']) | |
sourceInstanceArn = 'arn:aws:ec2:' + awsRegion + ':' + acctId + ':instance/' + sourceInstanceId | |
ec2Intel = { 'sourceReverseDomain': sourceReverseDomain, 'sourceInstanceId': sourceInstanceId, 'sourceInstanceArn': sourceInstanceArn } | |
sourceEc2Data = json.dumps(ec2Intel) | |
return sourceEc2Data | |
except Exception as e: | |
print(e) | |
raise | |
else: | |
try: | |
sourceReverseDomain = str(socket.gethostbyaddr(sourceIp)[0]) | |
except: | |
sourceReverseDomain = 'Unidentified' | |
sourceInstanceId = 'None' | |
sourceInstanceArn = 'None' | |
ec2Intel = { 'sourceReverseDomain': sourceReverseDomain, 'sourceInstanceId': sourceInstanceId, 'sourceInstanceArn': sourceInstanceArn } | |
sourceEc2Data = json.dumps(ec2Intel) | |
return sourceEc2Data | |
def dest_instance_or_dns_finder(): | |
rfc1918Check = ipaddress.IPv4Address(destIp) | |
if rfc1918Check.is_private: | |
try: | |
response = ec2.describe_instances(Filters=[{'Name':'private-ip-address','Values': [destIp]}]) | |
if str(response['Reservations']) == '[]': | |
destReverseDomain = 'Unidentified' | |
destInstanceId = 'None' | |
destInstanceArn = 'None' | |
ec2Intel = { 'destReverseDomain': destReverseDomain, 'destInstanceId': destInstanceId, 'destInstanceArn': destInstanceArn } | |
destEc2Data = json.dumps(ec2Intel) | |
return destEc2Data | |
else: | |
destReverseDomain = str(response['Reservations'][0]['Instances'][0]['PrivateDnsName']) | |
destInstanceId = str(response['Reservations'][0]['Instances'][0]['InstanceId']) | |
destInstanceArn = 'arn:aws:ec2:' + awsRegion + ':' + acctId + ':instance/' + destInstanceId | |
ec2Intel = { 'destReverseDomain': destReverseDomain, 'destInstanceId': destInstanceId, 'destInstanceArn': destInstanceArn } | |
destEc2Data = json.dumps(ec2Intel) | |
return destEc2Data | |
except Exception as e: | |
print(e) | |
raise | |
else: | |
try: | |
destReverseDomain = str(socket.gethostbyaddr(destIp)[0]) | |
except: | |
destReverseDomain = 'Unidentified' | |
destInstanceId = 'None' | |
destInstanceArn = 'None' | |
ec2Intel = { 'destReverseDomain': destReverseDomain, 'destInstanceId': destInstanceId, 'destInstanceArn': destInstanceArn } | |
destEc2Data = json.dumps(ec2Intel) | |
return destEc2Data | |
def flow_log_decorator(): | |
# Source IP GeoInt | |
srcGeoDict = json.loads(source_ip_checker()) | |
sourceIpLatid = str(srcGeoDict['srcLat']) | |
sourceIpLongt = str(srcGeoDict['srcLon']) | |
sourceIpCountry = str(srcGeoDict['srcCountryCode']) | |
sourceIsp = str(srcGeoDict['srcIsp']) | |
sourceOrg = str(srcGeoDict['srcOrg']) | |
sourceAs = str(srcGeoDict['srcAs']) | |
sourceAsname = str(srcGeoDict['srcAsname']) | |
# Dest IP GeoInt | |
destGeoDict = json.loads(dest_ip_checker()) | |
destIpLatid = str(destGeoDict['destLat']) | |
destIpLongt = str(destGeoDict['destLon']) | |
destIpCountry = str(destGeoDict['destCountryCode']) | |
destIsp = str(destGeoDict['destIsp']) | |
destOrg = str(destGeoDict['destOrg']) | |
destAs = str(destGeoDict['destAs']) | |
destAsname = str(destGeoDict['destAsname']) | |
# Source Instance Information | |
srcInstDict = json.loads(source_instance_or_dns_finder()) | |
sourceReverseDomain = str(srcInstDict['sourceReverseDomain']) | |
sourceInstanceId = str(srcInstDict['sourceInstanceId']) | |
sourceInstanceArn = str(srcInstDict['sourceInstanceArn']) | |
# Destination Instance Information | |
destInstDict = json.loads(dest_instance_or_dns_finder()) | |
destReverseDomain = str(destInstDict['destReverseDomain']) | |
destInstanceId = str(destInstDict['destInstanceId']) | |
destInstanceArn = str(destInstDict['destInstanceArn']) | |
flowLogRaw = { | |
'accountId': acctId, | |
'interfaceId': interfaceId, | |
'interfaceArn': eniArn, | |
'sourceInstanceId': sourceInstanceId, | |
'sourceInstanceArn': sourceInstanceArn, | |
'sourceIp': sourceIp, | |
'sourceReverseDomain': sourceReverseDomain, | |
'sourceIpLatid': sourceIpLatid, | |
'sourceIpLongt': sourceIpLongt, | |
'sourceIpCountry': sourceIpCountry, | |
'sourceIsp': sourceIsp, | |
'sourceOrg': sourceOrg, | |
'sourceAs': sourceAs, | |
'sourceAsname': sourceAsname, | |
'destInstanceId': destInstanceId, | |
'destInstanceArn': destInstanceArn, | |
'destIp': destIp, | |
'destReverseDomain': destReverseDomain, | |
'destIpLatid': destIpLatid, | |
'destIpLongt': destIpLongt, | |
'destIpCountry': destIpCountry, | |
'destIsp': destIsp, | |
'destOrg': destOrg, | |
'destAs': destAs, | |
'destAsname': destAsname, | |
'sourcePort': sourcePort, | |
'destPort': destPort, | |
'protocol': ianaProtocol, | |
'startTime': dtStartTime, | |
'endTime': dtEndTime, | |
'action': flowAction | |
} | |
# dump into a better looking dictionary | |
# default=str will stop that pesky "Object of type datetime is not JSON serializable" error you'll get | |
geoIntFlowLog = json.dumps(flowLogRaw, default=str) | |
print(geoIntFlowLog) | |
def main(): | |
source_ip_checker() | |
dest_ip_checker() | |
source_instance_or_dns_finder() | |
dest_instance_or_dns_finder() | |
flow_log_decorator() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment