Skip to content

Instantly share code, notes, and snippets.

@jonrau1
Last active February 26, 2024 20:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jonrau1/2710c54a20684157b28cdeb95364fb64 to your computer and use it in GitHub Desktop.
Save jonrau1/2710c54a20684157b28cdeb95364fb64 to your computer and use it in GitHub Desktop.
Lambda function (Py 3.8) to convert VPC flow logs from Cloudwatch Logs into JSON. Performs enrichment by adding ENI and EC2 information, reverse DNS and geolocation courtesy of ip-api.com
import requests
import time
import os
import json
import boto3
import ipaddress
import socket
import base64
import zlib
import datetime
def lambda_handler(event, context):
# import clients
ec2 = boto3.client('ec2')
# import clients
ec2 = boto3.client('ec2')
# create AWS region from Lambda env var
awsRegion = os.environ['AWS_REGION']
# grab the 'awslogs' part of the CWL event stream
data = event.get('awslogs', {}).get('data')
# Base64 Decode, decompress with ZLIB, load into a dict with json.loads
records = json.loads(zlib.decompress(base64.b64decode(data), 16 + zlib.MAX_WBITS))
# Loop through the flow log message
for flows in records['logEvents']:
# split the string to grab values, these are number coded above the modules in the comments
flowLogRaw = flows['message'].split()
acctId = flowLogRaw[1]
interfaceId = flowLogRaw[2]
sourceIp = flowLogRaw[3]
destIp = flowLogRaw[4]
sourcePort = flowLogRaw[5]
destPort = flowLogRaw[6]
ianaProtocol = flowLogRaw[7]
startTimeUnix = int(flowLogRaw[10])
endTimeUnix = int(flowLogRaw[11])
flowAction = flowLogRaw[12]
# convert the flow log Unix Seconds into UTC aware datetime
dtStartTime = datetime.datetime.fromtimestamp(startTimeUnix, datetime.timezone.utc)
dtEndTime = datetime.datetime.fromtimestamp(endTimeUnix, datetime.timezone.utc)
# create an ARN for the ENIs for cross-reference
eniArn = 'arn:aws:ec2:' + awsRegion + ':' + acctId + ':network-interface/' + interfaceId
def source_ip_checker():
rfc1918Check = ipaddress.IPv4Address(sourceIp)
if rfc1918Check.is_private:
countryCode = str('Unidentified')
latitude = str('Unidentified')
longitude = str('Unidentified')
isp = str('Unidentified')
org = str('Unidentified')
asn = str('Unidentified')
asnName = str('Unidentified')
geoLoc = { 'srcLat': latitude, 'srcLon': longitude, 'srcCountryCode': countryCode, 'srcIsp': isp, 'srcOrg': org, 'srcAs': asn, 'srcAsname': asnName }
geoData = json.dumps(geoLoc)
return geoData
else:
url = 'http://ip-api.com/json/' + sourceIp + '?fields=status,message,countryCode,lat,lon,isp,org,as,asname'
r = requests.get(url)
# handle throttling
reqRemain = int(r.headers['X-Rl'])
if reqRemain == 0:
ttl = int(r.headers['X-Ttl'])
waitTime = ttl + 1
time.sleep(waitTime)
else:
ipJson = r.json()
countryCode = str(ipJson['countryCode'])
latitude = str(ipJson['lat'])
longitude = str(ipJson['lon'])
isp = str(ipJson['isp'])
org = str(ipJson['org'])
asn = str(ipJson['as'])
asnName = str(ipJson['asname'])
geoLoc = { 'srcLat': latitude, 'srcLon': longitude, 'srcCountryCode': countryCode, 'srcIsp': isp, 'srcOrg': org, 'srcAs': asn, 'srcAsname': asnName }
geoData = json.dumps(geoLoc)
return geoData
def dest_ip_checker():
rfc1918Check = ipaddress.IPv4Address(destIp)
if rfc1918Check.is_private:
countryCode = str('Unidentified')
latitude = str('Unidentified')
longitude = str('Unidentified')
isp = str('Unidentified')
org = str('Unidentified')
asn = str('Unidentified')
asnName = str('Unidentified')
geoLoc = { 'destLat': latitude, 'destLon': longitude, 'destCountryCode': countryCode, 'destIsp': isp, 'destOrg': org, 'destAs': asn, 'destAsname': asnName }
geoData = json.dumps(geoLoc)
return geoData
else:
url = 'http://ip-api.com/json/' + destIp + '?fields=status,message,countryCode,lat,lon,isp,org,as,asname'
r = requests.get(url)
# handle throttling
reqRemain = int(r.headers['X-Rl'])
if reqRemain == 0:
ttl = int(r.headers['X-Ttl'])
waitTime = ttl + 1
time.sleep(waitTime)
else:
ipJson = r.json()
countryCode = str(ipJson['countryCode'])
latitude = str(ipJson['lat'])
longitude = str(ipJson['lon'])
isp = str(ipJson['isp'])
org = str(ipJson['org'])
asn = str(ipJson['as'])
asnName = str(ipJson['asname'])
geoLoc = { 'destLat': latitude, 'destLon': longitude, 'destCountryCode': countryCode, 'destIsp': isp, 'destOrg': org, 'destAs': asn, 'destAsname': asnName }
geoData = json.dumps(geoLoc)
return geoData
def source_instance_or_dns_finder():
rfc1918Check = ipaddress.IPv4Address(sourceIp)
if rfc1918Check.is_private:
try:
response = ec2.describe_instances(Filters=[{'Name':'private-ip-address','Values': [sourceIp]}])
if str(response['Reservations']) == '[]':
sourceReverseDomain = 'Unidentified'
sourceInstanceId = 'None'
sourceInstanceArn = 'None'
ec2Intel = { 'sourceReverseDomain': sourceReverseDomain, 'sourceInstanceId': sourceInstanceId, 'sourceInstanceArn': sourceInstanceArn }
sourceEc2Data = json.dumps(ec2Intel)
return sourceEc2Data
else:
sourceReverseDomain = str(response['Reservations'][0]['Instances'][0]['PrivateDnsName'])
sourceInstanceId = str(response['Reservations'][0]['Instances'][0]['InstanceId'])
sourceInstanceArn = 'arn:aws:ec2:' + awsRegion + ':' + acctId + ':instance/' + sourceInstanceId
ec2Intel = { 'sourceReverseDomain': sourceReverseDomain, 'sourceInstanceId': sourceInstanceId, 'sourceInstanceArn': sourceInstanceArn }
sourceEc2Data = json.dumps(ec2Intel)
return sourceEc2Data
except Exception as e:
print(e)
raise
else:
try:
sourceReverseDomain = str(socket.gethostbyaddr(sourceIp)[0])
except:
sourceReverseDomain = 'Unidentified'
sourceInstanceId = 'None'
sourceInstanceArn = 'None'
ec2Intel = { 'sourceReverseDomain': sourceReverseDomain, 'sourceInstanceId': sourceInstanceId, 'sourceInstanceArn': sourceInstanceArn }
sourceEc2Data = json.dumps(ec2Intel)
return sourceEc2Data
def dest_instance_or_dns_finder():
rfc1918Check = ipaddress.IPv4Address(destIp)
if rfc1918Check.is_private:
try:
response = ec2.describe_instances(Filters=[{'Name':'private-ip-address','Values': [destIp]}])
if str(response['Reservations']) == '[]':
destReverseDomain = 'Unidentified'
destInstanceId = 'None'
destInstanceArn = 'None'
ec2Intel = { 'destReverseDomain': destReverseDomain, 'destInstanceId': destInstanceId, 'destInstanceArn': destInstanceArn }
destEc2Data = json.dumps(ec2Intel)
return destEc2Data
else:
destReverseDomain = str(response['Reservations'][0]['Instances'][0]['PrivateDnsName'])
destInstanceId = str(response['Reservations'][0]['Instances'][0]['InstanceId'])
destInstanceArn = 'arn:aws:ec2:' + awsRegion + ':' + acctId + ':instance/' + destInstanceId
ec2Intel = { 'destReverseDomain': destReverseDomain, 'destInstanceId': destInstanceId, 'destInstanceArn': destInstanceArn }
destEc2Data = json.dumps(ec2Intel)
return destEc2Data
except Exception as e:
print(e)
raise
else:
try:
destReverseDomain = str(socket.gethostbyaddr(destIp)[0])
except:
destReverseDomain = 'Unidentified'
destInstanceId = 'None'
destInstanceArn = 'None'
ec2Intel = { 'destReverseDomain': destReverseDomain, 'destInstanceId': destInstanceId, 'destInstanceArn': destInstanceArn }
destEc2Data = json.dumps(ec2Intel)
return destEc2Data
def flow_log_decorator():
# Source IP GeoInt
srcGeoDict = json.loads(source_ip_checker())
sourceIpLatid = str(srcGeoDict['srcLat'])
sourceIpLongt = str(srcGeoDict['srcLon'])
sourceIpCountry = str(srcGeoDict['srcCountryCode'])
sourceIsp = str(srcGeoDict['srcIsp'])
sourceOrg = str(srcGeoDict['srcOrg'])
sourceAs = str(srcGeoDict['srcAs'])
sourceAsname = str(srcGeoDict['srcAsname'])
# Dest IP GeoInt
destGeoDict = json.loads(dest_ip_checker())
destIpLatid = str(destGeoDict['destLat'])
destIpLongt = str(destGeoDict['destLon'])
destIpCountry = str(destGeoDict['destCountryCode'])
destIsp = str(destGeoDict['destIsp'])
destOrg = str(destGeoDict['destOrg'])
destAs = str(destGeoDict['destAs'])
destAsname = str(destGeoDict['destAsname'])
# Source Instance Information
srcInstDict = json.loads(source_instance_or_dns_finder())
sourceReverseDomain = str(srcInstDict['sourceReverseDomain'])
sourceInstanceId = str(srcInstDict['sourceInstanceId'])
sourceInstanceArn = str(srcInstDict['sourceInstanceArn'])
# Destination Instance Information
destInstDict = json.loads(dest_instance_or_dns_finder())
destReverseDomain = str(destInstDict['destReverseDomain'])
destInstanceId = str(destInstDict['destInstanceId'])
destInstanceArn = str(destInstDict['destInstanceArn'])
flowLogRaw = {
'accountId': acctId,
'interfaceId': interfaceId,
'interfaceArn': eniArn,
'sourceInstanceId': sourceInstanceId,
'sourceInstanceArn': sourceInstanceArn,
'sourceIp': sourceIp,
'sourceReverseDomain': sourceReverseDomain,
'sourceIpLatid': sourceIpLatid,
'sourceIpLongt': sourceIpLongt,
'sourceIpCountry': sourceIpCountry,
'sourceIsp': sourceIsp,
'sourceOrg': sourceOrg,
'sourceAs': sourceAs,
'sourceAsname': sourceAsname,
'destInstanceId': destInstanceId,
'destInstanceArn': destInstanceArn,
'destIp': destIp,
'destReverseDomain': destReverseDomain,
'destIpLatid': destIpLatid,
'destIpLongt': destIpLongt,
'destIpCountry': destIpCountry,
'destIsp': destIsp,
'destOrg': destOrg,
'destAs': destAs,
'destAsname': destAsname,
'sourcePort': sourcePort,
'destPort': destPort,
'protocol': ianaProtocol,
'startTime': dtStartTime,
'endTime': dtEndTime,
'action': flowAction
}
# dump into a better looking dictionary
# default=str will stop that pesky "Object of type datetime is not JSON serializable" error you'll get
geoIntFlowLog = json.dumps(flowLogRaw, default=str)
print(geoIntFlowLog)
def main():
source_ip_checker()
dest_ip_checker()
source_instance_or_dns_finder()
dest_instance_or_dns_finder()
flow_log_decorator()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment