Skip to content

Instantly share code, notes, and snippets.

@gannino
Last active June 1, 2020 13:06
Show Gist options
  • Save gannino/231e9ad52571fdb095b8300eee292ba6 to your computer and use it in GitHub Desktop.
Save gannino/231e9ad52571fdb095b8300eee292ba6 to your computer and use it in GitHub Desktop.
Python script to tag AMI, Snapshot and Volumes the one not used by instances or AMI will get the Name tag with a Prefixed UNUSED at first run, when run again if the TAG start with UNUSED will delete the resource
#!/usr/bin/env python3
# most credit to the original: https://gist.github.com/brandond/6b4d22eaefbd66895f230f68f27ee586 & https://gist.github.com/danpritts/1089d878a76b14393478a7476476f97b
# Python script to tag AMI, Snapshot and Volumes the one not used by instances or AMI
# will get the Name tag with a Prefixed UNUSED at first run
# when run again if the TAG start with UNUSED will delete the resource
# Export the below variable to point Profile and region
# export AWS_PROFILE=aws-profile
# export AWS_DEFAULT_REGION=eu-west-3
# export AWS_DEFAULT_REGION=eu-west-2
# export AWS_DEFAULT_REGION=eu-central-1
# export AWS_DEFAULT_REGION=eu-west-1
# python3 tag-ami-snap-vol-and-clean-not-used-v2.py
import copy
import logging
import os
import boto3
import time
from ratelimiter import RateLimiter
from datetime import datetime
import datetime
# import dateparser?
# dateparser.parse('12-12-12')
#https://dateparser.readthedocs.io/en/latest/
logging.basicConfig(level=os.environ.get('LOG_LEVEL', 'INFO'))
ec2 = boto3.client('ec2')
logger = logging.getLogger(__name__)
def tag_ami():
image = {}
for image in ec2.describe_images(Owners=['self'])['Images']:
tags = extract_tags(image.get('Tags', []))
cur_tags = extract_tags(image.get('Tags', []))
new_tags = copy.deepcopy(cur_tags)
new_tags.update(tags)
new_tags['Name'] = 'AMI:' + image['Name']
if new_tags != cur_tags:
logger.info('{0}: Tags changed to {1}'.format(image['ImageId'], new_tags))
ec2.create_tags(Resources=[image['ImageId']], Tags=load_tags(new_tags))
# imagecreation = image.get('CreationDate', [])
# today = datetime.datetime.now().strftime("%Y-%m-%d")
# print(today)
# print(imagecreation)
def tag_snapshots_and_clean_not_used():
snapshots = {}
for response in ec2.get_paginator('describe_snapshots').paginate(OwnerIds=['self']):
snapshots.update([(snapshot['SnapshotId'], snapshot) for snapshot in response['Snapshots']])
for image in ec2.describe_images(Owners=['self'])['Images']:
tags = extract_tags(image.get('Tags', []))
devnum = 0
numberofdevs=len(image['BlockDeviceMappings'])
try:
for device in image['BlockDeviceMappings']:
if 'SnapshotId' in device['Ebs']:
devnum += 1
snapshot = snapshots[device['Ebs']['SnapshotId']]
snapshot['Used'] = True
cur_tags = extract_tags(snapshot.get('Tags', []))
new_tags = copy.deepcopy(cur_tags)
new_tags.update(tags)
new_tags['ImageId'] = image['ImageId']
# here's where to change formatting
new_tags['Name'] = 'AMI:' + image['Name'] + ' ' + device['DeviceName'] + ' (' + str(devnum) + '/' + str(numberofdevs) + ')'
if new_tags != cur_tags:
logger.info('{0}: Tags changed to {1}'.format(snapshot['SnapshotId'], new_tags))
ec2.create_tags(Resources=[snapshot['SnapshotId']], Tags=load_tags(new_tags))
except Exception as error:
logger.info('Not an EBS Device! Skipping {0} for Image: {1} as {2} - Python Error: {3}'.format(device['DeviceName'],image['ImageId'],device['VirtualName'],error))
for snapshot in snapshots.values():
if 'Used' not in snapshot:
cur_tags = extract_tags(snapshot.get('Tags', []))
name = cur_tags.get('Name', snapshot['SnapshotId'])
if name.startswith('UNUSED'):
logger.warning('Deleting not used snapshot: {0}'.format(snapshot['SnapshotId']))
ec2.delete_snapshot(SnapshotId=snapshot['SnapshotId'] )
if not name.startswith('UNUSED'):
logger.warning('{0} Unused! Updating tag'.format(snapshot['SnapshotId']))
cur_tags['Name'] = 'UNUSED ' + name
ec2.create_tags(Resources=[snapshot['SnapshotId']], Tags=load_tags(cur_tags))
def tag_volumes_and_clean_not_used():
volumes = {}
for response in ec2.get_paginator('describe_volumes').paginate():
volumes.update([(volume['VolumeId'], volume) for volume in response['Volumes']])
for response in ec2.get_paginator('describe_instances').paginate():
for reservation in response['Reservations']:
for instance in reservation['Instances']:
tags = extract_tags(instance.get('Tags', []))
# to tag the number volumes on an instance, e.g., (1/3) (2/3) (3/3)
devnum = 0
numberofdevs=len(instance['BlockDeviceMappings'])
for device in instance['BlockDeviceMappings']:
devnum += 1
volume = volumes[device['Ebs']['VolumeId']]
volume['Used'] = True
cur_tags = extract_tags(volume.get('Tags', []))
new_tags = copy.deepcopy(cur_tags)
new_tags.update(tags)
# here's where to change formatting
new_tags['Name'] = tags['Name'] + ' ' + device['DeviceName'] + ' (' + str(devnum) + '/' + str(numberofdevs) + ')'
if new_tags != cur_tags:
logger.info('{0} Tags changed to {1}'.format(volume['VolumeId'], new_tags))
ec2.create_tags(Resources=[volume['VolumeId']], Tags=load_tags(new_tags))
for volume in volumes.values():
if 'Used' not in volume:
cur_tags = extract_tags(volume.get('Tags', []))
name = cur_tags.get('Name', volume['VolumeId'])
if name.startswith('UNUSED'):
logger.warning('Deleting not used volume: {0}'.format(volume['VolumeId']))
ec2.delete_volume(VolumeId=volume['VolumeId'] )
if not name.startswith('UNUSED'):
logger.warning('{0} Unused! Updating tag'.format(volume['VolumeId']))
cur_tags['Name'] = 'UNUSED ' + name
ec2.create_tags(Resources=[volume['VolumeId']], Tags=load_tags(cur_tags))
@RateLimiter(max_calls=5, period=1)
def tag_everything():
tag_ami()
tag_snapshots_and_clean_not_used()
tag_volumes_and_clean_not_used()
def extract_tags(tags):
return {tag["Key"]: tag["Value"] for tag in tags if not tag["Key"].startswith("aws:")}
def load_tags(tags):
return [{"Key": k, "Value": v} for k, v in tags.items() if not k.startswith("aws:")]
def handler(event, context):
tag_everything()
if __name__ == '__main__':
tag_everything()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment