Skip to content

Instantly share code, notes, and snippets.

@danpritts
Last active October 14, 2022 07:50
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 9 You must be signed in to fork a gist
  • Save danpritts/1089d878a76b14393478a7476476f97b to your computer and use it in GitHub Desktop.
Save danpritts/1089d878a76b14393478a7476476f97b to your computer and use it in GitHub Desktop.
Automatically tag EC2 snapshots and volumes based on their attached AMIs/instances
# most credit to the original: https://gist.github.com/brandond/6b4d22eaefbd66895f230f68f27ee586
# Tag snapshots based on their associated AMI and volumes based on attached instance.
# format:
# (AMI:db5|db5) /dev/sda1 (1/4)
# (AMI:db5|db5) /dev/sdb (2/4)
# Best practice: create IAM user
# Simplest privilege to get it to work with reasonable security: use predefined policy "ReadOnlyAccess"
# and add your own custom policy that grants "ec2:CreateTags"
import copy
import logging
import os
import boto3
logging.basicConfig(level=os.environ.get('LOG_LEVEL', 'INFO'))
ec2 = boto3.client('ec2')
logger = logging.getLogger(__name__)
def tag_snapshots():
snapshots = {}
for response in ec2.get_paginator('describe_snapshots').paginate(OwnerIds=['self']):
snapshots.update([(snapshot['SnapshotId'], snapshot) for snapshot in response['Snapshots']])
for image in ec2.describe_images(Owners=['self'])['Images']:
tags = boto3_tag_list_to_ansible_dict(image.get('Tags', []))
# to tag volumes, e.g., (1/3) (2/3) (3/3)
devnum = 0
numberofdevs=len(image['BlockDeviceMappings'])
for device in image['BlockDeviceMappings']:
if 'SnapshotId' in device['Ebs']:
devnum += 1
snapshot = snapshots[device['Ebs']['SnapshotId']]
snapshot['Used'] = True
cur_tags = boto3_tag_list_to_ansible_dict(snapshot.get('Tags', []))
new_tags = copy.deepcopy(cur_tags)
new_tags.update(tags)
new_tags['ImageId'] = image['ImageId']
# here's where to change formatting
new_tags['Name'] = 'AMI:' + image['Name'] + ' ' + device['DeviceName'] + ' (' + str(devnum) + '/' + str(numberofdevs) + ')'
if new_tags != cur_tags:
logger.info('{0}: Tags changed to {1}'.format(snapshot['SnapshotId'], new_tags))
ec2.create_tags(Resources=[snapshot['SnapshotId']], Tags=ansible_dict_to_boto3_tag_list(new_tags))
for snapshot in snapshots.values():
if 'Used' not in snapshot:
cur_tags = boto3_tag_list_to_ansible_dict(snapshot.get('Tags', []))
name = cur_tags.get('Name', snapshot['SnapshotId'])
if not name.startswith('UNUSED'):
logger.warning('{0} Unused!'.format(snapshot['SnapshotId']))
cur_tags['Name'] = 'UNUSED ' + name
ec2.create_tags(Resources=[snapshot['SnapshotId']], Tags=ansible_dict_to_boto3_tag_list(cur_tags))
def tag_volumes():
volumes = {}
for response in ec2.get_paginator('describe_volumes').paginate():
volumes.update([(volume['VolumeId'], volume) for volume in response['Volumes']])
for response in ec2.get_paginator('describe_instances').paginate():
for reservation in response['Reservations']:
for instance in reservation['Instances']:
tags = boto3_tag_list_to_ansible_dict(instance.get('Tags', []))
# to tag the number volumes on an instance, e.g., (1/3) (2/3) (3/3)
devnum = 0
numberofdevs=len(instance['BlockDeviceMappings'])
for device in instance['BlockDeviceMappings']:
devnum += 1
volume = volumes[device['Ebs']['VolumeId']]
volume['Used'] = True
cur_tags = boto3_tag_list_to_ansible_dict(volume.get('Tags', []))
new_tags = copy.deepcopy(cur_tags)
new_tags.update(tags)
# here's where to change formatting
new_tags['Name'] = tags['Name'] + ' ' + device['DeviceName'] + ' (' + str(devnum) + '/' + str(numberofdevs) + ')'
if new_tags != cur_tags:
logger.info('{0} Tags changed to {1}'.format(volume['VolumeId'], new_tags))
ec2.create_tags(Resources=[volume['VolumeId']], Tags=ansible_dict_to_boto3_tag_list(new_tags))
for volume in volumes.values():
if 'Used' not in volume:
cur_tags = boto3_tag_list_to_ansible_dict(volume.get('Tags', []))
name = cur_tags.get('Name', volume['VolumeId'])
if not name.startswith('UNUSED'):
logger.warning('{0} Unused!'.format(volume['VolumeId']))
cur_tags['Name'] = 'UNUSED ' + name
ec2.create_tags(Resources=[volume['VolumeId']], Tags=ansible_dict_to_boto3_tag_list(cur_tags))
def tag_everything():
tag_snapshots()
tag_volumes()
def boto3_tag_list_to_ansible_dict(tags_list):
tags_dict = {}
for tag in tags_list:
if 'key' in tag and not tag['key'].startswith('aws:'):
tags_dict[tag['key']] = tag['value']
elif 'Key' in tag and not tag['Key'].startswith('aws:'):
tags_dict[tag['Key']] = tag['Value']
return tags_dict
def ansible_dict_to_boto3_tag_list(tags_dict):
tags_list = []
for k, v in tags_dict.items():
tags_list.append({'Key': k, 'Value': v})
return tags_list
def handler(event, context):
tag_everything()
if __name__ == '__main__':
tag_everything()
@sansane123
Copy link

Can you help me understand why are you using Ansible here ?

@danpritts
Copy link
Author

Ansible functions were in the original code I copied (see url in comments at top). At a guess, the original author knew about this method from other work and reused it.

@jester-vergara
Copy link

image

image

Same script is working on another environment and on my own personal AWS account, just wondering what is causing this error on this environment. Thanks in advance for any help!

@danpritts
Copy link
Author

danpritts commented Oct 22, 2019 via email

@jester-vergara
Copy link

Not sure if access or cred, but is using predefined CI CD deployment to different environments. Work on the other one and only on this only env it is not working. Thanks for the feedback as well.

Copy link

ghost commented Oct 23, 2019

Looks like the key "Ebs" is not present in all BlockDeviceMappings as some might be ephemeral volumes.

I added a try catch around that to see what the error is
image
and the output of the exception block is
image

having a try-catch block around these kind of things helps in narrowing the error and continue the execution of the script.

@jester-vergara
Copy link

Thanksssss a lot mate! Cheers, owe you one! @yyashwanth

@wwwsilvathiago
Copy link

Thanks, Your script help me a lot...

@hellupline
Copy link

I did some small improvements for this gist

#!/usr/bin/env python3

import logging
import os

import boto3


logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO"))

ec2 = boto3.client("ec2")
logger = logging.getLogger(__name__)


def tag_snapshots():
    snapshots = {
        snapshot["SnapshotId"]: snapshot
        for page in ec2.get_paginator("describe_snapshots").paginate(OwnerIds=["self"])
        for snapshot in page["Snapshots"]
    }
    for image in ec2.describe_images(Owners=["self"])["Images"]:
        numberofdevs = len(image["BlockDeviceMappings"])
        devnum = 0
        tags = extract_tags(image.get("Tags", []))
        for device in image["BlockDeviceMappings"]:
            if "Ebs" in device and "SnapshotId" in device["Ebs"]:
                devnum += 1
                name = f"AMI: {image['Name']} {device['DeviceName']} ({devnum}/{numberofdevs})"
                snapshot = snapshots[device["Ebs"]["SnapshotId"]]
                snapshot["Used"] = True
                new_tags = {
                    **extract_tags(snapshot.get("Tags", [])),
                    **tags,
                    "Name": name,
                    "ImageId": image["ImageId"],
                }
                if new_tags != tags:
                    logger.info("%s: Tags changed to %s", snapshot["SnapshotId"], repr(new_tags))
                    ec2.create_tags(Resources=[snapshot["SnapshotId"]], Tags=load_tags(new_tags))

    for snapshot in snapshots.values():
        if "Used" not in snapshot:
            tags = extract_tags(snapshot.get("Tags", []))
            name = tags.get("Name", snapshot["SnapshotId"])
            if not name.startswith("UNUSED"):
                logger.warning("%s: Unused", snapshot["SnapshotId"])
                tags["Name"] = f"UNUSED {name}"
                ec2.create_tags(Resources=[snapshot["SnapshotId"]], Tags=load_tags(tags))


def tag_volumes():
    volumes = {
        volume["VolumeId"]: volume
        for page in ec2.get_paginator("describe_volumes").paginate()
        for volume in page["Volumes"]
    }
    for page in ec2.get_paginator("describe_instances").paginate():
        for reservation in page["Reservations"]:
            for instance in reservation["Instances"]:
                numberofdevs = len(instance["BlockDeviceMappings"])
                devnum = 0
                tags = extract_tags(instance.get("Tags", []))
                for device in instance["BlockDeviceMappings"]:
                    if "Ebs" in device:
                        devnum += 1
                        name = f"{tags['Name']} {device['DeviceName']} ({devnum}/{numberofdevs})"
                        volume = volumes[device["Ebs"]["VolumeId"]]
                        volume["Used"] = True
                        new_tags = {
                            **extract_tags(volume.get("Tags", [])),
                            **tags,
                            "Name": name,
                        }
                        if new_tags != tags:
                            logger.info("%s: Tags changed to %s", volume["VolumeId"], repr(new_tags))
                            ec2.create_tags(Resources=[volume["VolumeId"]], Tags=load_tags(new_tags))

    for volume in volumes.values():
        if "Used" not in volume:
            tags = extract_tags(volume.get("Tags", []))
            name = tags.get("Name", volume["VolumeId"])
            if not name.startswith("UNUSED"):
                logger.warning("%s: Unused", volume["VolumeId"])
                tags["Name"] = f"UNUSED {name}"
                ec2.create_tags(Resources=[volume["VolumeId"]], Tags=load_tags(tags))


def tag_everything():
    tag_snapshots()
    tag_volumes()


def extract_tags(tags):
    return {tag["Key"]: tag["Value"] for tag in tags if not tag["Key"].startswith("aws:")}


def load_tags(tags):
    return [{"Key": k, "Value": v} for k, v in tags.items() if not k.startswith("aws:")]


if __name__ == "__main__":
    tag_everything()

@gannino
Copy link

gannino commented May 12, 2020

Hello Everyone and thanks for this tread,
I've done some further changes and tried to include as much as possible improvement to this and add the feature of deleting resources to the script as I needed to delete resources that are not used by AMI or Instances and have published a gist here if it comes useful to anyone: https://gist.github.com/gannino/17b41c316f874af5d4068c2674a54048

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment