Skip to content

Instantly share code, notes, and snippets.

@clcollins
Last active July 13, 2020 17:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clcollins/0f302c1b80f1da7612a8ff7fdcb41f80 to your computer and use it in GitHub Desktop.
Save clcollins/0f302c1b80f1da7612a8ff7fdcb41f80 to your computer and use it in GitHub Desktop.
AWS Account Cleanup
#!/usr/bin/env python3
#
# This script written for python3.7
# If running on hive, you will need a virtualenv.
#
# $ virtualenv venv -p python3.7
# $ pip install boto3
#
# Then you should be able to run this script.
# v0.2.2
# 2020-06-18
import sys
import argparse
import boto3
import logging
import json
COVERED_REGIONS=[
"us-east-1",
"us-east-2",
"us-west-1",
"us-west-2",
"ca-central-1",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"ap-northeast-1",
"ap-northeast-2",
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"sa-east-1"
]
def assume_role(account_id, initial_session, logger):
client = initial_session.client('sts')
role_arn = f"arn:aws:iam::{account_id}:role/OrganizationAccountAccessRole"
role_session_name = "SREAdminReuseCleanup"
duration = 900
default_region = 'us-east-1'
logger.debug(client.get_caller_identity())
response = client.assume_role(
RoleArn=role_arn,
RoleSessionName=role_session_name,
DurationSeconds=duration
)
logger.debug(response)
session = boto3.Session(
aws_access_key_id=response['Credentials']['AccessKeyId'],
aws_secret_access_key=response['Credentials']['SecretAccessKey'],
aws_session_token=response['Credentials']['SessionToken'],
region_name=default_region
)
logger.debug(session)
return session
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
'-a',
'--account_id',
type=str,
required=True,
help='AWS AccountID to cleanup'
)
parser.add_argument(
'-d',
'--dry-run',
action='store_true',
help='Report information on what would be deleted, but do not act.'
)
parser.add_argument(
'-v',
'--verbose',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'-r',
'--region',
type=str,
default='ALL',
help='AWS Region to look through.'
)
parser.add_argument(
'-p',
'--profile',
type=str,
required=True,
help='AWS Profile to use'
)
return parser.parse_args()
def read_s3_buckets(client, logger):
buckets = []
response = client.list_buckets()
logger.debug(response)
# S3 bucket responses do not paginate
if response['Buckets']:
for bucket in response['Buckets']:
logger.debug(f"Appending bucket to list: {bucket}")
buckets.append(bucket)
logger.info(f"S3 buckets to delete: {len(buckets)}")
return buckets
def read_ec2_instances(client, logger):
response = client.describe_instances()
logger.debug(response)
instances = []
# Instance responses do not paginate
for reservation in response['Reservations']:
if reservation['Instances']:
for instance in reservation['Instances']:
logger.debug(f"Appending instances to list: {instance}")
instances.append(instance)
logger.info(f"EC2 instances to delete: {len(instances)}")
return instances
def read_volume_snapshots(client, logger):
snapshot_filters = [{'Name': 'owner-alias', 'Values': ['self']}]
paginator = client.get_paginator('describe_snapshots')
page_iterator = paginator.paginate(Filters=snapshot_filters)
snapshots = []
for page in page_iterator:
logger.debug(page)
for snapshot in page['Snapshots']:
logger.debug(f"Appending to list: {snapshot['Id']}")
snapshots.append(snapshot['Id'])
logger.info(f"Snapshots to delete: {len(snapshots)}")
return snapshots
def read_ebs_volumes(client, logger):
paginator = client.get_paginator('describe_volumes')
page_iterator = paginator.paginate()
volumes = []
for page in page_iterator:
logger.debug(page)
for volume in page['Volumes']:
logger.debug(f"Appending to list: {volume['VolumeId']}")
volumes.append(volume['VolumeId'])
logger.info(f"Volumes to delete: {len(volumes)}")
return volumes
def read_hosted_zones(client, logger):
paginator = client.get_paginator('list_hosted_zones')
page_iterator = paginator.paginate()
zones = []
for page in page_iterator:
logger.debug(page)
for zone in page['HostedZones']:
logger.debug(f"Appending to list: {zone['Id']}")
zones.append(zone)
# Remove zones that have only the default NS and SOA records
filtered_zones = filter_zones(logger, zones)
logger.info(f"Zones to delete: {len(zones)}")
return filtered_zones
def read_record_sets(client, logger, zone):
paginator = client.get_paginator(
'list_resource_record_sets')
page_iterator = paginator.paginate(
HostedZoneId=zone['Id'])
record_set_pages = []
for page in page_iterator:
logger.debug(page)
logger.debug(f"Appending to list: {page}")
record_set_pages.append(page)
return record_set_pages
def create_change_batch(client, logger, record_set_pages):
change_batch = {}
change_batch['Changes'] = []
for record_set_page in record_set_pages:
for record_set in record_set_page['ResourceRecordSets']:
logger.debug(record_set)
if record_set['Type'] != 'NS' and record_set['Type'] != 'SOA':
change_batch['Changes'].append(
{
'Action': 'DELETE',
'ResourceRecordSet': record_set
}
)
return change_batch
def submit_change_batch(client, args, logger, zone_id, change_batch):
if not change_batch['Changes']:
logger.info("No record sets to delete")
return
if args.dry_run:
logger.info(f"Dry run; skipping record set deletion")
return
# This is where we change resource record sets in batch
response = client.change_resource_record_sets(
HostedZoneId=zone_id,
ChangeBatch=change_batch
)
logger.debug(response)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
logger.critical(f"Failed deleting record set batch: {change_batch}")
else:
logger.info(f"Deleted record set for zone: {zone_id}")
return
def delete_s3_buckets(client, args, logger, buckets):
if args.dry_run:
logger.info(f"Dry run; skipping S3 bucket deletion")
return
for bucket in buckets:
response = client.delete_bucket(Bucket=bucket['Name'])
logger.debug(response)
code = response['ResponseMetadata']['HTTPStatusCode']
if code != 200 and code != 204:
logger.critical(f"Failed deleting bucket: {bucket['Name']}")
else:
logger.info(f"Deleted bucket: {bucket['Name']}")
return
def delete_ec2_instances(client, args, logger, instances):
if args.dry_run:
logger.info(f"Dry run; skipping ec2 instance deletion")
return
instance_ids = [instance['InstanceId'] for instance in instances]
response = client.terminate_instances(InstanceIds=instance_ids)
logger.debug(response)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
logger.critical(f"Failed deleting instances: {instance_ids}")
else:
logger.info(f"Deleted instances: {instance_ids}")
return
def delete_ebs_volumes(client, args, logger, volumes):
if args.dry_run:
logger.info(f"Dry run; skipping EBS volume deletion")
return
for volume in volumes:
response = client.delete_volume(VolumeId=volume)
logger.debug(response)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
logger.critical(f"Failed deleting EBS volume: {volume}")
else:
logger.info(f"Deleted EBS volume: {volume}")
return
def delete_volume_snapshots(client, args, logger, snapshots):
if args.dry_run:
logger.info(f"Dry run; skipping volume snapshot deletion")
return
for snapshot in snapshots:
response = client.delete_snapshot(SnapshotId=snapshot['Id'])
logger.debug(response)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
logger.critical(
f"Failed deleting volume snapshot: {snapshot['Id']}"
)
else:
logger.info(f"Deleted volume snapshot: {snapshot['Id']}")
return
def delete_hosted_zones(client, args, logger, zones):
if args.dry_run:
logger.info(f"Dry run; skipping hosted zone deletion")
return
for zone in zones:
response = client.delete_hosted_zone(Id=zone['Id'])
logger.debug(response)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
logger.critical(f"Failed deleting hosted zone: {zone['Id']}")
else:
logger.info(f"Deleted hosted zone: {zone['Id']}")
return
def get_and_delete_hostedzone_recordsets(client, args, logger, zones):
for zone in zones:
record_set_pages = read_record_sets(client, logger, zone)
change_batch = create_change_batch(client, logger, record_set_pages)
submit_change_batch(client, args, logger, zone['Id'], change_batch)
return
def filter_zones(logger, zones):
for zone in zones:
if zone['ResourceRecordSetCount'] <= 2:
# Remove the zone from zones
logger.info(f"Zone has only default record sets: {zone['Id']}")
zones.remove(zone)
logger.debug(zones)
return zones
def setup_logging(verbose):
logging.basicConfig(
format='%(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p'
)
logger = logging.getLogger('main')
logger.setLevel(logging.INFO)
if verbose:
logger.setLevel(logging.DEBUG)
logger.debug("Set logging level to DEBUG")
return logger
def main():
args = parse_arguments()
logger = setup_logging(args.verbose)
logger.debug(f"args: {args}")
args.dry_run and logger.debug('Dry Run; not deleting resources')
# The initial AWS session from which to assume roles
initial_session = boto3.Session(profile_name=args.profile)
output = {}
output['awsAccountId'] = args.account_id
output['dry_run'] = args.dry_run
output['regions'] = []
output['messages'] = []
# Assume the root account role for the accounts being cleaned up
try:
session = assume_role(args.account_id, initial_session, logger)
except Exception as e:
output['messages'].append(str(e))
print(json.dumps(output))
return
logger.info(f"Searching for resources in account {args.account_id}")
if args.region == "ALL":
regions = COVERED_REGIONS
else:
regions = [args.region]
for region in regions:
region_dict = {region: {}}
# EC2 Instances
ec2_client = session.client('ec2', region_name=region)
instances = read_ec2_instances(ec2_client, logger)
# Snapshots
snapshots = read_volume_snapshots(ec2_client, logger)
# EBS Volumes
volumes = read_ebs_volumes(ec2_client, logger)
# HostedZones and Records
route53_client = session.client('route53', region_name=region)
zones = read_hosted_zones(route53_client, logger)
# S3 buckets
s3_client = session.client('s3', region_name=region)
buckets = read_s3_buckets(s3_client, logger)
# Construct output info
region_dict[region]['ec2_instances'] = len(instances)
region_dict[region]['volume_snapshots'] = len(snapshots)
region_dict[region]['ebs_volumes'] = len(volumes)
region_dict[region]['hosted_zones'] = len(zones)
region_dict[region]['s3_buckets'] = len(buckets)
if len(instances) > 3:
output['messages'].append('more than 3 EC2 instances found; please verify this account should be deleted')
print(json.dumps(output))
# Should this be a break?
return
logger.info(f"Removing resources in account {args.account_id}, region {region}")
instances and delete_ec2_instances(ec2_client, args, logger, instances)
snapshots and delete_volume_snapshots(
ec2_client, args, logger, snapshots)
volumes and delete_ebs_volumes(ec2_client, args, logger, volumes)
zones and get_and_delete_hostedzone_recordsets(
route53_client, args, logger, zones)
zones and delete_hosted_zones(route53_client, args, logger, zones)
buckets and delete_s3_buckets(s3_client, args, logger, buckets)
output['regions'].append(region_dict)
print(json.dumps(output))
if __name__ == "__main__":
main()
@iamkirkbater
Copy link

iamkirkbater commented May 18, 2020

Add support for regions:

    parser.add_argument(
        '-r',
        '--region',
        type=str,
        default='us-east-1',
        help='AWS Region to look through.'
    )
ec2_client = session.client('ec2', region_name=args.region)
route53_client = session.client('route53', region_name=args.region)
s3_client = session.client('s3', region_name=args.region)

And in the output maps:

output['region'] = args.region

@clcollins
Copy link
Author

Added region support, fixed volume bugs.

@clcollins
Copy link
Author

Need to handle botocore.exceptions.ClientError (VolumeInUse) errors, when terminating an instance takes a bit too long and the volume is still there. Retry backup with a 30 second timeout?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment