Created
April 8, 2021 03:37
-
-
Save jonrau1/68697fc3858074b7c3786c1a9071159f to your computer and use it in GitHub Desktop.
Multi-region script to create a CSV inventory of all Security Groups including any possible permutations of ingress/egress rules (IPv4,IPv6,Prefix List, SG) - Uses Pandas!!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## pip3 install --upgrade pip | |
## pip3 install --upgrade requests | |
## pip3 install --upgrade awscli | |
## pip3 install --upgrade boto3 | |
## pip3 install --upgrade pandas | |
import boto3 | |
import json | |
import pandas as pd | |
regionList = [] | |
sgList = [] | |
ingressList = [] | |
egressList = [] | |
sts = boto3.client('sts') | |
ec2 = boto3.client('ec2') | |
account = sts.get_caller_identity()['Account'] | |
try: | |
# Get all Regions we are opted in for | |
for r in ec2.describe_regions()['Regions']: | |
regionName = str(r['RegionName']) | |
optInStatus = str(r['OptInStatus']) | |
if optInStatus == 'not-opted-in': | |
pass | |
else: | |
regionList.append(regionName) | |
print('All Regions retrieved from EC2 service') | |
except Exception as e: | |
raise e | |
for region in regionList: | |
session = boto3.Session(region_name=region) | |
tempec2 = session.client('ec2') | |
paginator = tempec2.get_paginator('describe_security_groups') | |
for page in paginator.paginate(): | |
for sg in page['SecurityGroups']: | |
# Top Level Security Group | |
sgId = str(sg['GroupId']) | |
sgName = str(sg['GroupName']) | |
if sgName == 'default': | |
isDefault = 'True' | |
else: | |
isDefault = 'False' | |
try: | |
sgDescr = str(sg['Description']) | |
if sgDescr == '': | |
sgDescr = 'None Provided' | |
else: | |
sgDescr = sgDescr | |
except: | |
sgDescr = 'None Provided' | |
try: | |
tags = str(sg['Tags']) | |
except: | |
tags = str('[]') | |
sgMain = { | |
'AccountId': account, | |
'AwsRegion': region, | |
'Description': sgDescr, | |
'GroupName': sgName, | |
'OwnerId': str(sg['OwnerId']), | |
'IsDefault': isDefault, | |
'SecurityGroupId': sgId, | |
'VpcId': str(sg['VpcId']), | |
'Tags': tags | |
} | |
sgList.append(sgMain) | |
# INGRESS RULES | |
for ingress in sg['IpPermissions']: | |
protocol = str(ingress['IpProtocol']) | |
if protocol == str('-1'): | |
protocol = 'ANY' | |
fromPort = str('0') | |
toPort = str('65535') | |
portRange = '0-65535' | |
else: | |
protocol = protocol.upper() | |
fromPort = str(ingress['FromPort']) | |
toPort = str(ingress['ToPort']) | |
if fromPort == toPort: | |
portRange = fromPort | |
else: | |
portRange = fromPort + '-' + toPort | |
# Attempt to account for multiple conditions for IPV4,IPV6,PLs,SGs | |
try: | |
for ip4 in ingress['IpRanges']: | |
destination = str(ip4['CidrIp']) | |
destType = 'IpV4' | |
try: | |
ingressDescr = str(ip4['Description']) | |
except: | |
ingressDescr = 'None Provided' | |
ingressDict = { | |
'SecurityGroupId': sgId, | |
'IngressDestination': destination, | |
'DestinationType': destType, | |
'FromPort': fromPort, | |
'ToPort': toPort, | |
'PortRange': portRange, | |
'Protocol': protocol, | |
'RuleDescription': ingressDescr | |
} | |
ingressList.append(ingressDict) | |
except: | |
continue | |
try: | |
for ip6 in ingress['Ipv6Ranges']: | |
destination = str(ip6['CidrIpv6']) | |
destType = 'IpV6' | |
try: | |
ingressDescr = str(ip6['Description']) | |
except: | |
ingressDescr = 'None Provided' | |
ingressDict = { | |
'SecurityGroupId': sgId, | |
'IngressDestination': destination, | |
'DestinationType': destType, | |
'FromPort': fromPort, | |
'ToPort': toPort, | |
'PortRange': portRange, | |
'Protocol': protocol, | |
'RuleDescription': ingressDescr | |
} | |
ingressList.append(ingressDict) | |
except: | |
continue | |
try: | |
for pl in ingress['PrefixListIds']: | |
destination = str(pl['PrefixListId']) | |
destType = 'PrefixList' | |
try: | |
ingressDescr = str(pl['Description']) | |
except: | |
ingressDescr = 'None Provided' | |
ingressDict = { | |
'SecurityGroupId': sgId, | |
'IngressDestination': destination, | |
'DestinationType': destType, | |
'FromPort': fromPort, | |
'ToPort': toPort, | |
'PortRange': portRange, | |
'Protocol': protocol, | |
'RuleDescription': ingressDescr | |
} | |
ingressList.append(ingressDict) | |
except: | |
continue | |
try: | |
for uid in ingress['UserIdGroupPairs']: | |
destination = str(uid['GroupId']) | |
destType = 'SecurityGroup' | |
try: | |
ingressDescr = str(uid['Description']) | |
except: | |
ingressDescr = 'None Provided' | |
ingressDict = { | |
'SecurityGroupId': sgId, | |
'IngressDestination': destination, | |
'DestinationType': destType, | |
'FromPort': fromPort, | |
'ToPort': toPort, | |
'PortRange': portRange, | |
'Protocol': protocol, | |
'RuleDescription': ingressDescr | |
} | |
ingressList.append(ingressDict) | |
except: | |
continue | |
# EGRESS RULES | |
for egress in sg['IpPermissionsEgress']: | |
protocol = str(egress['IpProtocol']) | |
if protocol == str('-1'): | |
protocol = 'ANY' | |
fromPort = str('0') | |
toPort = str('65535') | |
portRange = '0-65535' | |
else: | |
protocol = protocol.upper() | |
fromPort = str(egress['FromPort']) | |
toPort = str(egress['ToPort']) | |
if fromPort == toPort: | |
portRange = fromPort | |
else: | |
portRange = fromPort + '-' + toPort | |
# Attempt to account for multiple conditions for IPV4,IPV6,PLs,SGs | |
try: | |
for ip4 in egress['IpRanges']: | |
destination = str(ip4['CidrIp']) | |
destType = 'IpV4' | |
try: | |
egressDescr = str(ip4['Description']) | |
except: | |
egressDescr = 'None Provided' | |
egressDict = { | |
'SecurityGroupId': sgId, | |
'EgressDestination': destination, | |
'DestinationType': destType, | |
'FromPort': fromPort, | |
'ToPort': toPort, | |
'PortRange': portRange, | |
'Protocol': protocol, | |
'RuleDescription': egressDescr | |
} | |
egressList.append(egressDict) | |
except: | |
continue | |
try: | |
for ip6 in egress['Ipv6Ranges']: | |
destination = str(ip6['CidrIpv6']) | |
destType = 'IpV6' | |
try: | |
egressDescr = str(ip6['Description']) | |
except: | |
egressDescr = 'None Provided' | |
egressDict = { | |
'SecurityGroupId': sgId, | |
'EgressDestination': destination, | |
'DestinationType': destType, | |
'FromPort': fromPort, | |
'ToPort': toPort, | |
'PortRange': portRange, | |
'Protocol': protocol, | |
'RuleDescription': egressDescr | |
} | |
egressList.append(egressDict) | |
except: | |
continue | |
try: | |
for pl in egress['PrefixListIds']: | |
destination = str(pl['PrefixListId']) | |
destType = 'PrefixList' | |
try: | |
egressDescr = str(pl['Description']) | |
except: | |
egressDescr = 'None Provided' | |
egressDict = { | |
'SecurityGroupId': sgId, | |
'EgressDestination': destination, | |
'DestinationType': destType, | |
'FromPort': fromPort, | |
'ToPort': toPort, | |
'PortRange': portRange, | |
'Protocol': protocol, | |
'RuleDescription': egressDescr | |
} | |
egressList.append(egressDict) | |
except: | |
continue | |
try: | |
for uid in egress['UserIdGroupPairs']: | |
destination = str(uid['GroupId']) | |
destType = 'SecurityGroup' | |
try: | |
egressDescr = str(uid['Description']) | |
except: | |
egressDescr = 'None Provided' | |
egressDict = { | |
'SecurityGroupId': sgId, | |
'EgressDestination': destination, | |
'DestinationType': destType, | |
'FromPort': fromPort, | |
'ToPort': toPort, | |
'PortRange': portRange, | |
'Protocol': protocol, | |
'RuleDescription': egressDescr | |
} | |
egressList.append(egressDict) | |
except: | |
continue | |
print('Data gathering complete!') | |
sgDf = pd.DataFrame(sgList) | |
sgDf.to_csv('./sg-inventory.csv', index=False) | |
ingressDf = pd.DataFrame(ingressList) | |
ingressDf.to_csv('./sg-rules-ingress.csv', index=False) | |
egressDf = pd.DataFrame(egressList) | |
egressDf.to_csv('./sg-rules-egress.csv', index=False) | |
print('DataFrame creation complete!') | |
ingressMerge = sgDf.merge( | |
ingressDf, | |
how = 'left', | |
left_on = 'SecurityGroupId', | |
right_on = 'SecurityGroupId' | |
) | |
del sgDf | |
del ingressDf | |
fullMerge = ingressMerge.merge( | |
egressDf, | |
how = 'left', | |
left_on = 'SecurityGroupId', | |
right_on = 'SecurityGroupId', | |
suffixes = ('_ingress', '_egress') | |
) | |
fullMerge.to_csv('./aws-security-groups.csv', index=False) | |
print('Merges complete!') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment