Skip to content

Instantly share code, notes, and snippets.

@torian
Last active August 4, 2017 17:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save torian/e5a75611144870a2761912999d17c292 to your computer and use it in GitHub Desktop.
Save torian/e5a75611144870a2761912999d17c292 to your computer and use it in GitHub Desktop.
AWS Snapshot cleanup
#!/usr/bin/env python
"""
Usage:
aws_snaps.py amis [options]
aws_snaps.py volumes [options]
aws_snaps.py snapshots [options]
Options:
--iddqd God mode. By default, deletions are run in dry mode
-l,--loglevel LOGLVL Log level [default: INFO]
-p,--profile PROFILE AWS profile name [default: '']
-r,--region REGION AWS Region [default: us-east-1]
--ls List objects
-x,--exclude-tags TAGS Exclude objects by tags (json list '{ "key": "value": }')
-i,--include-tags TAGS Include objects by tags (json list '{ "key": "value": }')
Snapshots Options:
--cleanup Cleanup
"""
import sys
import docopt
import logging
import json
from boto.ec2 import connect_to_region
from datetime import datetime, timedelta
##############################################################################
def get_snapshots(c, owner = 'self'):
s = c.get_all_snapshots(owner = owner)
snapshots = [
(
{
'id': t.id,
'volume_id': t.volume_id,
'volume_size': t.volume_size,
'tags': t.tags,
'status': t.status
}
) for t in s
]
return snapshots
def get_volumes(c):
v = c.get_all_volumes()
volumes = [
(
{
'id': u.id,
'size': u.size,
'status': u.status,
'snapshot_id': u.snapshot_id,
'tags': u.tags
}
) for u in v
]
return volumes
def get_amis(c, owner = 'self'):
a = c.get_all_images(owners = owner)
amis = [
(
{
'id': b.id,
'name': b.name,
'snapshots': [
v.snapshot_id for k, v in b.block_device_mapping.items() if v.snapshot_id
]
}
) for b in a
]
return amis
def filter_exclude_tags(objects, kv):
"""
The implementation will only filter using the first k/v
in the `kv` variable
"""
o = map(lambda m:
objects.pop(objects.index(m)),
filter(lambda f:
kv.keys()[0] in f['tags'].keys()
and f['tags'][kv.keys()[0]] == kv.values()[0],
objects
)
)
return objects
def filter_include_tags(objects, kv):
"""
The implementation will do a two phase filter, first, all elements
that contain the keys in `kv`, and then, all elements that match
the specified value
"""
keys = kv.keys()
o = filter(lambda s:
reduce(
lambda a, b: a or b,
map(lambda v: v in s['tags'].keys(), keys)
),
objects)
try:
values = kv.values()
o = filter(lambda s:
reduce(
lambda a, b: a or b,
map(lambda v: v in s['tags'].values(), values)
),
o)
except KeyError as err:
pass
return o
##############################################################################
LOG_FORMAT = '[%(asctime)s] %(levelname)s: %(message)s'
opts = docopt.docopt(__doc__)
logging.basicConfig(
format = LOG_FORMAT,
level = getattr(logging, opts.get('--loglevel', 'INFO'))
)
try:
ec2_conn = connect_to_region(opts['--region'], profile_name = opts['--profile'])
except Exception as err:
logging.err('Exception: {}'.format(err))
if opts['volumes']:
volumes = get_volumes(ec2_conn)
if opts['--exclude-tags'] is not None:
x_tags_json = json.loads(opts['--exclude-tags'])
volumes = filter_exclude_tags(volumes, x_tags_json)
elif opts['--include-tags'] is not None:
i_tags_json = json.loads(opts['--include-tags'])
volumes = filter_include_tags(volumes, i_tags_json)
logging.info('Found {} volumes'.format(len(volumes)))
if opts['--ls']:
for v in volumes:
print json.dumps(v, indent = 4)
elif opts['amis']:
amis = get_amis(ec2_conn)
logging.info('Found {} AMIs'.format(len(amis)))
if opts['--ls']:
for a in amis:
print json.dumps(a, indent = 4)
elif opts['snapshots']:
snapshots = get_snapshots(ec2_conn)
if opts['--exclude-tags'] is not None:
x_tags_json = json.loads(opts['--exclude-tags'])
snapshots = filter_exclude_tags(snapshots, x_tags_json)
elif opts['--include-tags'] is not None:
i_tags_json = json.loads(opts['--include-tags'])
snapshots = filter_include_tags(snapshots, i_tags_json)
logging.info('Found {} snapshots'.format(len(snapshots)))
if opts['--ls']:
for s in snapshots:
print json.dumps(s, indent = 4)
if opts['--cleanup']:
amis = get_amis(ec2_conn)
volumes = get_volumes(ec2_conn)
# Create a list of snapshot ids and one for all the snapshots
# that are associated with an AMI (using the snapshot_id
# attribute in AMI objects). Then, use sets to extract all
# the snapshots that exist in the AMI list
snapshots_ids = map(lambda i: i['id'], snapshots)
snapshots_ami = []
map(lambda s: snapshots_ami.extend(s['snapshots']), amis)
snapshots_ami = list(set(snapshots_ami))
snapshots_without_ami = list(set(snapshots_ids).difference(set(snapshots_ami)))
logging.info('Snapshot count: {}'.format(len(snapshots_ids)))
logging.info('AMI snapshot count: {}'.format(len(snapshots_ami)))
logging.info('Snapshot without AMI count: {}'.format(len(snapshots_without_ami)))
# This should be the definitive list of snapshots to be removed
snap_size = 0
for s in snapshots:
if s['id'] not in snapshots_without_ami:
continue
snap_size = snap_size + s['volume_size']
if opts['--iddqd']:
logging.info('Removing snapshot id:{} - tags: {}'.format(
s['id'], json.dumps(s['tags'])
)
)
ec2_conn.delete_snapshot(s['id'])
else:
logging.info('Would remove snapshot id:{} - tags: {})'.format(
s['id'], json.dumps(s['tags'])
)
)
#ec2_conn.delete_snapshot(s['id'], dry_run = True)
logging.info('Freed {}GB'.format(snap_size))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment