Skip to content

Instantly share code, notes, and snippets.

@garyellis
Created March 13, 2018 15:40
Show Gist options
  • Save garyellis/65d7a8f92a1b08a8a6fe08dd5851609b to your computer and use it in GitHub Desktop.
Save garyellis/65d7a8f92a1b08a8a6fe08dd5851609b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import csv
import os
import boto3
from botocore.exceptions import ClientError
from datetime import datetime, timedelta
import logging
import sys
#import aws_pricing_api
logging.getLogger('botocore').setLevel(logging.CRITICAL)
log = logging.getLogger()
log.handlers = []
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
'%(asctime)4s %(name)4s [%(filename)s:%(lineno)s - %(funcName)s()] %(levelname)4s %(message)4s')
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.INFO)
def get_available_ebs_volumes(region):
"return a list of available EBS volumes"
ebs_filter = [{'Name': 'status', 'Values': ['available']}]
try:
ec2 = boto3.resource("ec2", region_name=region)
available_volumes = ec2.volumes.filter(Filters=ebs_filter)
except ClientError as err:
print err.response['Error']['Message']
raise
log.info('Found volumes in status available: {}'.format(len([vol for vol in available_volumes])))
return available_volumes
def get_ebs_volume_idle_time(region, volume_id,threshold=2):
"Get available EBS volume idle time"
metrics = {}
threshold = timedelta(days=threshold)
end_time = datetime.now() + timedelta(days=1)
start_time = end_time - threshold
try:
cw = boto3.client("cloudwatch", region_name=region)
metrics = cw.get_metric_statistics(
Namespace='AWS/EBS',
MetricName='VolumeIdleTime',
Dimensions=[{'Name': 'VolumeId', 'Value': volume_id}],
Period=3600,
StartTime=start_time,
EndTime=end_time,
Statistics=['Minimum'],
Unit='Seconds'
)
except ClientError as err:
print err.response['Error']['Message']
raise
return metrics['Datapoints']
def is_candidate(region, volume_id):
""
idle_time = get_ebs_volume_idle_time(region, volume_id)
if len(idle_time):
for data_point in idle_time:
# idle time is 5 minute interval aggregate so we use 299 to test if we're lower than that
if data_point['Minimum'] < 299:
return False
# if the volume had no metrics lower than 299, it's probably not
# actually being used for anything so we can include it as a
# candidate for deletion
return True
def tag_name(tags):
""
name = ''
if tags:
for tag in tags:
if 'Name' in tag['Key']:
name = tag['Value']
return name
def get_candidate_ebs_volumes(region, volumes):
""
candidate_volumes = [
volume
for volume in volumes
if is_candidate(region, volume.volume_id)
]
log.info('Found candidate volumes: {}'.format(len(candidate_volumes)))
return candidate_volumes
def delete_ebs_volumes(volumes):
""
try:
for volume in volumes:
volume.delete()
log.info('Removed: {}'.format(volume.volume_id))
except ClientError as err:
log.error(err.response['Error']['Message'])
def get_regions():
""
regions = []
try:
for region in boto3.client('ec2').describe_regions()['Regions']:
regions.append(region['RegionName'])
except:
raise
return regions
def get_aws_account():
"get the current account"
sts_client = boto3.client('sts')
account_id = sts_client.get_caller_identity()["Account"]
return account_id
def main():
""
log.info('Start EBS cleanup for AWS account: {}/{}'.format(
get_aws_account(),
os.environ['AWS_DEFAULT_PROFILE'])
)
for region in get_regions():
log.info('EBS cleanup starting in region: {}'.format(region))
available_volumes = get_available_ebs_volumes(region)
candidate_volumes = get_candidate_ebs_volumes(region, available_volumes)
delete_ebs_volumes(candidate_volumes)
log.info('EBS cleanup completed in region: {}'.format(region))
log.info('End EBS cleanup for AWS account: {}/{}'.format(
get_aws_account(),
os.environ['AWS_DEFAULT_PROFILE'])
)
main()
## def candidate_volumes_to_csv():
## "export candidate_volumes to csv"
##
## for r in boto3.client('ec2').describe_regions()['Regions']:
## region = r['RegionName']
## print ' {0} checking for caldidate volumes'.format(region)
##
## out_filename='./available-ebs-volumes.csv'
## out_fields=['account_alias', 'region','volume_id','volume_type', 'volume_size_gb', 'volume_create_time','volume_tags', 'volume_GB_month_rate']
## with open(out_filename, 'a') as f:
## writer = csv.DictWriter(f, fieldnames=out_fields, lineterminator = '\n', )
## writer.writeheader()
## for volume in candidate_volumes:
## writer.writerow({
## 'account_alias': os.environ['AWS_DEFAULT_PROFILE'],
## 'region': region,
## 'volume_id': volume.volume_id,
## 'volume_type': volume.volume_type,
## 'volume_size_gb': volume.size,
## 'volume_create_time': volume.create_time,
## 'volume_tags': tag_name(volume.tags),
## 'volume_GB_month_rate': aws_pricing_api.ebs_get_rate_gb_month(region, volume.volume_type)
## })
## print os.environ['AWS_DEFAULT_PROFILE'], region, volume.volume_id, volume.volume_type, volume.size, volume.create_time, aws_pricing_api.ebs_get_rate_gb_month(region, volume.volume_type), tag_name(volume.tags)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment