Skip to content

Instantly share code, notes, and snippets.

@jonrau1
Created April 8, 2021 03:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jonrau1/68697fc3858074b7c3786c1a9071159f to your computer and use it in GitHub Desktop.
Save jonrau1/68697fc3858074b7c3786c1a9071159f to your computer and use it in GitHub Desktop.
Multi-region script to create a CSV inventory of all Security Groups including any possible permutations of ingress/egress rules (IPv4,IPv6,Prefix List, SG) - Uses Pandas!!
## pip3 install --upgrade pip
## pip3 install --upgrade requests
## pip3 install --upgrade awscli
## pip3 install --upgrade boto3
## pip3 install --upgrade pandas
import boto3
import json
import pandas as pd
regionList = []
sgList = []
ingressList = []
egressList = []
sts = boto3.client('sts')
ec2 = boto3.client('ec2')
account = sts.get_caller_identity()['Account']
try:
# Get all Regions we are opted in for
for r in ec2.describe_regions()['Regions']:
regionName = str(r['RegionName'])
optInStatus = str(r['OptInStatus'])
if optInStatus == 'not-opted-in':
pass
else:
regionList.append(regionName)
print('All Regions retrieved from EC2 service')
except Exception as e:
raise e
for region in regionList:
session = boto3.Session(region_name=region)
tempec2 = session.client('ec2')
paginator = tempec2.get_paginator('describe_security_groups')
for page in paginator.paginate():
for sg in page['SecurityGroups']:
# Top Level Security Group
sgId = str(sg['GroupId'])
sgName = str(sg['GroupName'])
if sgName == 'default':
isDefault = 'True'
else:
isDefault = 'False'
try:
sgDescr = str(sg['Description'])
if sgDescr == '':
sgDescr = 'None Provided'
else:
sgDescr = sgDescr
except:
sgDescr = 'None Provided'
try:
tags = str(sg['Tags'])
except:
tags = str('[]')
sgMain = {
'AccountId': account,
'AwsRegion': region,
'Description': sgDescr,
'GroupName': sgName,
'OwnerId': str(sg['OwnerId']),
'IsDefault': isDefault,
'SecurityGroupId': sgId,
'VpcId': str(sg['VpcId']),
'Tags': tags
}
sgList.append(sgMain)
# INGRESS RULES
for ingress in sg['IpPermissions']:
protocol = str(ingress['IpProtocol'])
if protocol == str('-1'):
protocol = 'ANY'
fromPort = str('0')
toPort = str('65535')
portRange = '0-65535'
else:
protocol = protocol.upper()
fromPort = str(ingress['FromPort'])
toPort = str(ingress['ToPort'])
if fromPort == toPort:
portRange = fromPort
else:
portRange = fromPort + '-' + toPort
# Attempt to account for multiple conditions for IPV4,IPV6,PLs,SGs
try:
for ip4 in ingress['IpRanges']:
destination = str(ip4['CidrIp'])
destType = 'IpV4'
try:
ingressDescr = str(ip4['Description'])
except:
ingressDescr = 'None Provided'
ingressDict = {
'SecurityGroupId': sgId,
'IngressDestination': destination,
'DestinationType': destType,
'FromPort': fromPort,
'ToPort': toPort,
'PortRange': portRange,
'Protocol': protocol,
'RuleDescription': ingressDescr
}
ingressList.append(ingressDict)
except:
continue
try:
for ip6 in ingress['Ipv6Ranges']:
destination = str(ip6['CidrIpv6'])
destType = 'IpV6'
try:
ingressDescr = str(ip6['Description'])
except:
ingressDescr = 'None Provided'
ingressDict = {
'SecurityGroupId': sgId,
'IngressDestination': destination,
'DestinationType': destType,
'FromPort': fromPort,
'ToPort': toPort,
'PortRange': portRange,
'Protocol': protocol,
'RuleDescription': ingressDescr
}
ingressList.append(ingressDict)
except:
continue
try:
for pl in ingress['PrefixListIds']:
destination = str(pl['PrefixListId'])
destType = 'PrefixList'
try:
ingressDescr = str(pl['Description'])
except:
ingressDescr = 'None Provided'
ingressDict = {
'SecurityGroupId': sgId,
'IngressDestination': destination,
'DestinationType': destType,
'FromPort': fromPort,
'ToPort': toPort,
'PortRange': portRange,
'Protocol': protocol,
'RuleDescription': ingressDescr
}
ingressList.append(ingressDict)
except:
continue
try:
for uid in ingress['UserIdGroupPairs']:
destination = str(uid['GroupId'])
destType = 'SecurityGroup'
try:
ingressDescr = str(uid['Description'])
except:
ingressDescr = 'None Provided'
ingressDict = {
'SecurityGroupId': sgId,
'IngressDestination': destination,
'DestinationType': destType,
'FromPort': fromPort,
'ToPort': toPort,
'PortRange': portRange,
'Protocol': protocol,
'RuleDescription': ingressDescr
}
ingressList.append(ingressDict)
except:
continue
# EGRESS RULES
for egress in sg['IpPermissionsEgress']:
protocol = str(egress['IpProtocol'])
if protocol == str('-1'):
protocol = 'ANY'
fromPort = str('0')
toPort = str('65535')
portRange = '0-65535'
else:
protocol = protocol.upper()
fromPort = str(egress['FromPort'])
toPort = str(egress['ToPort'])
if fromPort == toPort:
portRange = fromPort
else:
portRange = fromPort + '-' + toPort
# Attempt to account for multiple conditions for IPV4,IPV6,PLs,SGs
try:
for ip4 in egress['IpRanges']:
destination = str(ip4['CidrIp'])
destType = 'IpV4'
try:
egressDescr = str(ip4['Description'])
except:
egressDescr = 'None Provided'
egressDict = {
'SecurityGroupId': sgId,
'EgressDestination': destination,
'DestinationType': destType,
'FromPort': fromPort,
'ToPort': toPort,
'PortRange': portRange,
'Protocol': protocol,
'RuleDescription': egressDescr
}
egressList.append(egressDict)
except:
continue
try:
for ip6 in egress['Ipv6Ranges']:
destination = str(ip6['CidrIpv6'])
destType = 'IpV6'
try:
egressDescr = str(ip6['Description'])
except:
egressDescr = 'None Provided'
egressDict = {
'SecurityGroupId': sgId,
'EgressDestination': destination,
'DestinationType': destType,
'FromPort': fromPort,
'ToPort': toPort,
'PortRange': portRange,
'Protocol': protocol,
'RuleDescription': egressDescr
}
egressList.append(egressDict)
except:
continue
try:
for pl in egress['PrefixListIds']:
destination = str(pl['PrefixListId'])
destType = 'PrefixList'
try:
egressDescr = str(pl['Description'])
except:
egressDescr = 'None Provided'
egressDict = {
'SecurityGroupId': sgId,
'EgressDestination': destination,
'DestinationType': destType,
'FromPort': fromPort,
'ToPort': toPort,
'PortRange': portRange,
'Protocol': protocol,
'RuleDescription': egressDescr
}
egressList.append(egressDict)
except:
continue
try:
for uid in egress['UserIdGroupPairs']:
destination = str(uid['GroupId'])
destType = 'SecurityGroup'
try:
egressDescr = str(uid['Description'])
except:
egressDescr = 'None Provided'
egressDict = {
'SecurityGroupId': sgId,
'EgressDestination': destination,
'DestinationType': destType,
'FromPort': fromPort,
'ToPort': toPort,
'PortRange': portRange,
'Protocol': protocol,
'RuleDescription': egressDescr
}
egressList.append(egressDict)
except:
continue
print('Data gathering complete!')
sgDf = pd.DataFrame(sgList)
sgDf.to_csv('./sg-inventory.csv', index=False)
ingressDf = pd.DataFrame(ingressList)
ingressDf.to_csv('./sg-rules-ingress.csv', index=False)
egressDf = pd.DataFrame(egressList)
egressDf.to_csv('./sg-rules-egress.csv', index=False)
print('DataFrame creation complete!')
ingressMerge = sgDf.merge(
ingressDf,
how = 'left',
left_on = 'SecurityGroupId',
right_on = 'SecurityGroupId'
)
del sgDf
del ingressDf
fullMerge = ingressMerge.merge(
egressDf,
how = 'left',
left_on = 'SecurityGroupId',
right_on = 'SecurityGroupId',
suffixes = ('_ingress', '_egress')
)
fullMerge.to_csv('./aws-security-groups.csv', index=False)
print('Merges complete!')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment