Script to detect stale AWS security groups
#!/usr/bin/env python | |
import boto3 | |
import argparse | |
class StaleSGDetector(object): | |
""" | |
Class to hold the logic for detecting AWS security groups that are stale. | |
""" | |
def __init__(self, **kwargs): | |
super(StaleSGDetector, self).__init__() | |
self.region = kwargs["region"] | |
self.profile = kwargs["profile"] | |
self.all_groups = set() | |
self.security_groups_in_use = set() | |
self.stale_groups = set() | |
self.ec2_instances_count = 0 | |
self.network_interfaces_count = 0 | |
self.elbs_count = 0 | |
self.albs_count = 0 | |
self.rds_count = 0 | |
self.redshift_count = 0 | |
def get_session(self): | |
"""Gets a new boto3 session""" | |
kwargs = {} | |
kwargs['profile_name'] = self.profile | |
kwargs['region_name'] = self.region | |
session = boto3.Session(**kwargs) | |
return session | |
def run(self): | |
self.session = self.get_session() | |
self.get_all_security_groups() | |
self.get_ec2_security_groups() | |
self.get_network_interfaces_security_groups() | |
self.get_elb_security_groups() | |
self.get_alb_security_groups() | |
self.get_rds_security_groups() | |
self.get_redshift_security_groups() | |
self.calculate_stale_security_groups() | |
def get_all_security_groups(self): | |
"""Adds all existing security groups used in the region.""" | |
ec2_client = self.session.client('ec2') | |
security_groups_dict = ec2_client.describe_security_groups() | |
security_groups = security_groups_dict['SecurityGroups'] | |
for group in security_groups: | |
# Default SGs don't have to be deleted. | |
if group['GroupName'] == 'default': | |
self.security_groups_in_use.add(group['GroupId']) | |
self.all_groups.add(group['GroupId']) | |
def get_ec2_security_groups(self): | |
"""Adds security groups used by EC2 instances.""" | |
ec2_client = self.session.client('ec2') | |
instances = ec2_client.describe_instances() | |
reservations = instances['Reservations'] | |
for reservation in reservations: | |
for instance in reservation['Instances']: | |
self.ec2_instances_count += 1 | |
for group in instance['SecurityGroups']: | |
self.security_groups_in_use.add(group['GroupId']) | |
def get_network_interfaces_security_groups(self): | |
"""Adds security groups used by Network Interfaces (ENIs).""" | |
ec2_client = self.session.client('ec2') | |
enis = ec2_client.describe_network_interfaces() | |
for eni in enis['NetworkInterfaces']: | |
self.network_interfaces_count += 1 | |
for group in eni['Groups']: | |
self.security_groups_in_use.add(group['GroupId']) | |
def get_elb_security_groups(self): | |
"""Adds security groups used by classic elastic loadbalancers (ELBs).""" | |
elb_client = self.session.client('elb') | |
elbs = elb_client.describe_load_balancers() | |
for elb in elbs['LoadBalancerDescriptions']: | |
self.elbs_count += 1 | |
for group in elb['SecurityGroups']: | |
self.security_groups_in_use.add(group) | |
def get_alb_security_groups(self): | |
"""Adds security groups used by application loadbalancers (ALBs).""" | |
alb_client = self.session.client('elbv2') | |
albs = alb_client.describe_load_balancers() | |
for alb in albs['LoadBalancers']: | |
self.albs_count += 1 | |
for group in alb['SecurityGroups']: | |
self.security_groups_in_use.add(group) | |
def get_rds_security_groups(self): | |
"""Adds security groups used by RDS.""" | |
rds_client = self.session.client('rds') | |
rdses = rds_client.describe_db_instances() | |
for rds in rdses['DBInstances']: | |
self.rds_count += 1 | |
for group in rds['VpcSecurityGroups']: | |
self.security_groups_in_use.add(group['VpcSecurityGroupId']) | |
def get_redshift_security_groups(self): | |
"""Adds security groups used by Redshift.""" | |
redshift_client = self.session.client('redshift') | |
redshifts = redshift_client.describe_clusters() | |
for cluster in redshifts['Clusters']: | |
self.redshift_count += 1 | |
for group in cluster['VpcSecurityGroups']: | |
self.security_groups_in_use.add(group['VpcSecurityGroupId']) | |
def calculate_stale_security_groups(self): | |
self.stale_groups = self.all_groups.difference(self.security_groups_in_use) | |
# Check if there is a SG in usd SGs but not in all. | |
# Could that ever be true? | |
assert(not self.security_groups_in_use.difference(self.all_groups)) | |
def report(detector_object): | |
print("---------------") | |
search_log = ( | |
"Searched through {} EC2 instances, " | |
"{} network interfaces, {} ELBs, {} ALBs, " | |
"{} RDS instances, {} Redshift clusters" | |
).format( | |
detector_object.ec2_instances_count, | |
detector_object.network_interfaces_count, | |
detector_object.elbs_count, detector_object.albs_count, | |
detector_object.rds_count, detector_object.redshift_count | |
) | |
print(search_log) | |
sg_log = "Found {} Security Groups in total, which {} are in-use.".format( | |
len(detector_object.all_groups), len(detector_object.security_groups_in_use) | |
) | |
print(sg_log) | |
stale_log = "Following {} security groups don't seem to be used:".format( | |
len(detector_object.stale_groups) | |
) | |
print(stale_log) | |
for sg in detector_object.stale_groups: | |
print " - ", sg | |
print("---------------") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Show unused security groups") | |
parser.add_argument( | |
"-p", "--profile", type=str, default="default", | |
help="The AWS profile that will be used." | |
) | |
parser.add_argument( | |
"-r", "--region", type=str, default="eu-west-1", | |
help="The default region is eu-west-1." | |
) | |
parser.add_argument( | |
"-d", "--delete", help="Delete security groups from AWS", | |
action="store_true" | |
) | |
parser.add_argument( | |
"-n", "--no-report", help="Skip showing report for Security Groups", | |
action="store_true", default=False | |
) | |
args = parser.parse_args() | |
detector = StaleSGDetector( | |
**{"region": args.region, "profile": args.profile} | |
) | |
detector.run() | |
if not args.no_report: | |
report(detector) |
This comment has been minimized.
This comment has been minimized.
line 146:
should read:
|
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
Thanks for your amazing script, I was exactly looking for that, and thanks to Last week in AWS, here I am!
I did one modification to allow boto to read
AWS_PROFILE
env variable (instead of reading from args).This is my workflow, I find it better and I think most tools follow this paradigm, so it is a bit counter intuitive to have to specify it.
If you want, take my fork, and update yours: https://gist.github.com/pierreozoux/98dae7f3f2a5a7bbb40a78f3b7976f33 I think more people would benefit! (No PR on gists :/ )