Skip to content

Instantly share code, notes, and snippets.

@diasjorge
Created August 9, 2021 14:24
Show Gist options
  • Save diasjorge/bf4e392b20beffbf46256d76ce5f879f to your computer and use it in GitHub Desktop.
Save diasjorge/bf4e392b20beffbf46256d76ce5f879f to your computer and use it in GitHub Desktop.
#!/usr/local/bin/python3
import csv
import re
from collections import OrderedDict
from pprint import pprint
import boto3
import click
from botocore.exceptions import ClientError
ec2 = None
exists_icon = '✅'
not_exists_icon = '❌'
@click.group()
def cli():
'''
Helper commands for Snapshots management.
'''
pass
@cli.command()
@click.option('-r', '--regions', required=True, help='Comma separated list')
def snapshot_report(regions):
'''
Find unreferenced snapshots.
'''
global ec2
with open('report.csv', 'w') as csv_file:
csv_writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
csv_writer.writerow([
'id',
'volume_id',
'volume_exists',
'ami_id',
'ami_exists',
'instance_id',
'instance_exists',
'size',
'start_time',
'description' ])
for region in regions:
ec2 = boto3.client('ec2', region_name=region)
for snapshot in get_snapshots():
csv_writer.writerow([
snapshot['id'],
snapshot['volume_id'],
snapshot['volume_exists'],
snapshot['ami_id'],
snapshot['ami_exists'],
snapshot['instance_id'],
snapshot['instance_exists'],
str(snapshot['size']) + 'gb',
str(snapshot['start_time']),
snapshot['description']])
@cli.command()
@click.option('-f', '--force', is_flag=True)
@click.option('-r', '--regions', required=True, help='Comma separated list')
def snapshot_cleanup(force, regions):
'''
Find and delete unreferenced snapshots.
'''
global ec2
for region in regions.split(','):
ec2 = boto3.client('ec2', region_name=region)
print('region={}'.format(region))
print('{:22} {:23} {:23} {:23} {:>7} {:25} {:30}'.format('snapshot id', 'volume id', 'ami id',
'instance id', 'size', 'start time', 'description'))
for snapshot in get_snapshots():
volume_exists = exists_icon if snapshot['volume_exists'] else not_exists_icon
ami_exists = exists_icon if snapshot['ami_exists'] else not_exists_icon
instance_exists = exists_icon if snapshot['instance_exists'] else not_exists_icon
print('{:22} {:22} {:22} {:22} {:>7} {:25} {:30}'.format(
snapshot['id'],
snapshot['volume_id'] + volume_exists,
snapshot['ami_id'] + ami_exists,
snapshot['instance_id'] + instance_exists,
str(snapshot['size']) + 'gb',
str(snapshot['start_time']),
snapshot['description']
))
if not snapshot['volume_exists'] and not snapshot['ami_exists'] and not snapshot['instance_exists'] and (force or click.confirm('Delete?', default=True)):
snapshot_delete(snapshot['id'])
@cli.command()
@click.option('-f', '--force', is_flag=True)
@click.option('-r', '--regions', required=True, help='Comma separated list')
def volume_cleanup(force, regions):
'''
Find and delete unused volumes.
'''
global ec2
print('{:23} {:20} {:>7} {:10} {:23}'.format(
'volume id', 'status', 'size', 'created', 'snapshot id'))
for region in regions:
ec2 = boto3.client('ec2', region_name=region)
print('region={}'.format(region))
for volume in get_available_volumes():
snapshot_exists = exists_icon if volume['snapshot_exists'] else not_exists_icon
print('{:23} {:20} {:>7} {:10} {:22}'.format(
volume['id'],
volume['status'],
str(volume['size']) + 'gb',
volume['create_time'].strftime('%Y-%m-%d'),
volume['snapshot_id'] + snapshot_exists
))
if not volume['snapshot_exists']:
print('Tags:')
print(' '+('\n '.join(['{}={}'.format(click.style(key, fg='blue'), tag)
for key, tag in volume['tags'].items()])))
if force or click.confirm('Delete?', default=True):
ec2.delete_volume(VolumeId=volume['id'])
def snapshot_delete(snapshot_id):
'''
Delete single snapshot by id.
'''
try:
ec2.delete_snapshot(SnapshotId=snapshot_id)
print('Deleted ' + snapshot_id)
except ClientError as e:
print('Failed to delete ' + snapshot_id)
print(e)
def get_snapshots():
'''
Get all snapshots.
'''
paginator = ec2.get_paginator('describe_snapshots')
page_iterator = paginator.paginate(OwnerIds=['self'])
for page in page_iterator:
for snapshot in page['Snapshots']:
instance_id, image_id = parse_description(snapshot['Description'])
yield {
'id': snapshot['SnapshotId'],
'description': snapshot['Description'],
'start_time': snapshot['StartTime'],
'size': snapshot['VolumeSize'],
'volume_id': snapshot['VolumeId'],
'volume_exists': volume_exists(snapshot['VolumeId']),
'instance_id': instance_id,
'instance_exists': instance_exists(instance_id),
'ami_id': image_id,
'ami_exists': image_exists(image_id),
}
def get_available_volumes():
'''
Get all volumes in 'available' state. (Volumes not attached to any instance)
'''
paginator = ec2.get_paginator('describe_volumes')
page_iterator = paginator.paginate(Filters=[{'Name': 'status', 'Values': ['available']}])
for page in page_iterator:
for volume in page['Volumes']:
yield {
'id': volume['VolumeId'],
'create_time': volume['CreateTime'],
'status': volume['State'],
'size': volume['Size'],
'snapshot_id': volume['SnapshotId'],
'snapshot_exists': str(snapshot_exists(volume['SnapshotId'])),
'tags': OrderedDict(sorted([(tag['Key'], tag['Value']) for tag in volume['Tags']])),
}
snapshot_cache = {}
def snapshot_exists(snapshot_id):
global snapshot_cache
if not snapshot_id:
return ''
try:
if snapshot_id not in snapshot_cache:
ec2.describe_snapshots(SnapshotIds=[snapshot_id])
snapshot_cache[snapshot_id] = True
except ClientError:
snapshot_cache[snapshot_id] = False
return snapshot_cache[snapshot_id]
volume_cache = {}
def volume_exists(volume_id):
global volume_cache
if not volume_id:
return False
try:
if volume_id not in volume_cache:
ec2.describe_volumes(VolumeIds=[volume_id])
volume_cache[volume_id] = True
except ClientError:
volume_cache[volume_id] = False
return volume_cache[volume_id]
instance_cache = {}
def instance_exists(instance_id):
global instance_cache
if not instance_id:
return ''
try:
instance_cache[instance_id] = (len(ec2.describe_instances(InstanceIds=[instance_id])['Reservations']) != 0)
except ClientError:
instance_cache[instance_id] = False
return instance_cache[instance_id]
image_cache = {}
def image_exists(image_id):
global image_cache
if not image_id:
return ''
try:
image_cache[image_id] = (len(ec2.describe_images(ImageIds=[image_id])['Images']) != 0)
except ClientError:
image_cache[image_id] = False
return image_cache[image_id]
def parse_description(description):
regex = r"^Created by CreateImage\((.*?)\) for (.*?) "
matches = re.finditer(regex, description, re.MULTILINE)
for matchNum, match in enumerate(matches):
return match.groups()
return '', ''
if __name__ == '__main__':
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment