Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Automatically tag EC2 snapshots and volumes based on their attached AMIs/instances
# most credit to the original: https://gist.github.com/brandond/6b4d22eaefbd66895f230f68f27ee586
# Tag snapshots based on their associated AMI and volumes based on attached instance.
# format:
# (AMI:db5|db5) /dev/sda1 (1/4)
# (AMI:db5|db5) /dev/sdb (2/4)
# Best practice: create IAM user
# Simplest privilege to get it to work with reasonable security: use predefined policy "ReadOnlyAccess"
# and add your own custom policy that grants "ec2:CreateTags"
import copy
import logging
import os
import boto3
logging.basicConfig(level=os.environ.get('LOG_LEVEL', 'INFO'))
ec2 = boto3.client('ec2')
logger = logging.getLogger(__name__)
def tag_snapshots():
snapshots = {}
for response in ec2.get_paginator('describe_snapshots').paginate(OwnerIds=['self']):
snapshots.update([(snapshot['SnapshotId'], snapshot) for snapshot in response['Snapshots']])
for image in ec2.describe_images(Owners=['self'])['Images']:
tags = boto3_tag_list_to_ansible_dict(image.get('Tags', []))
# to tag volumes, e.g., (1/3) (2/3) (3/3)
devnum = 0
numberofdevs=len(image['BlockDeviceMappings'])
for device in image['BlockDeviceMappings']:
if 'SnapshotId' in device['Ebs']:
devnum += 1
snapshot = snapshots[device['Ebs']['SnapshotId']]
snapshot['Used'] = True
cur_tags = boto3_tag_list_to_ansible_dict(snapshot.get('Tags', []))
new_tags = copy.deepcopy(cur_tags)
new_tags.update(tags)
new_tags['ImageId'] = image['ImageId']
# here's where to change formatting
new_tags['Name'] = 'AMI:' + image['Name'] + ' ' + device['DeviceName'] + ' (' + str(devnum) + '/' + str(numberofdevs) + ')'
if new_tags != cur_tags:
logger.info('{0}: Tags changed to {1}'.format(snapshot['SnapshotId'], new_tags))
ec2.create_tags(Resources=[snapshot['SnapshotId']], Tags=ansible_dict_to_boto3_tag_list(new_tags))
for snapshot in snapshots.values():
if 'Used' not in snapshot:
cur_tags = boto3_tag_list_to_ansible_dict(snapshot.get('Tags', []))
name = cur_tags.get('Name', snapshot['SnapshotId'])
if not name.startswith('UNUSED'):
logger.warning('{0} Unused!'.format(snapshot['SnapshotId']))
cur_tags['Name'] = 'UNUSED ' + name
ec2.create_tags(Resources=[snapshot['SnapshotId']], Tags=ansible_dict_to_boto3_tag_list(cur_tags))
def tag_volumes():
volumes = {}
for response in ec2.get_paginator('describe_volumes').paginate():
volumes.update([(volume['VolumeId'], volume) for volume in response['Volumes']])
for response in ec2.get_paginator('describe_instances').paginate():
for reservation in response['Reservations']:
for instance in reservation['Instances']:
tags = boto3_tag_list_to_ansible_dict(instance.get('Tags', []))
# to tag the number volumes on an instance, e.g., (1/3) (2/3) (3/3)
devnum = 0
numberofdevs=len(instance['BlockDeviceMappings'])
for device in instance['BlockDeviceMappings']:
devnum += 1
volume = volumes[device['Ebs']['VolumeId']]
volume['Used'] = True
cur_tags = boto3_tag_list_to_ansible_dict(volume.get('Tags', []))
new_tags = copy.deepcopy(cur_tags)
new_tags.update(tags)
# here's where to change formatting
new_tags['Name'] = tags['Name'] + ' ' + device['DeviceName'] + ' (' + str(devnum) + '/' + str(numberofdevs) + ')'
if new_tags != cur_tags:
logger.info('{0} Tags changed to {1}'.format(volume['VolumeId'], new_tags))
ec2.create_tags(Resources=[volume['VolumeId']], Tags=ansible_dict_to_boto3_tag_list(new_tags))
for volume in volumes.values():
if 'Used' not in volume:
cur_tags = boto3_tag_list_to_ansible_dict(volume.get('Tags', []))
name = cur_tags.get('Name', volume['VolumeId'])
if not name.startswith('UNUSED'):
logger.warning('{0} Unused!'.format(volume['VolumeId']))
cur_tags['Name'] = 'UNUSED ' + name
ec2.create_tags(Resources=[volume['VolumeId']], Tags=ansible_dict_to_boto3_tag_list(cur_tags))
def tag_everything():
tag_snapshots()
tag_volumes()
def boto3_tag_list_to_ansible_dict(tags_list):
tags_dict = {}
for tag in tags_list:
if 'key' in tag and not tag['key'].startswith('aws:'):
tags_dict[tag['key']] = tag['value']
elif 'Key' in tag and not tag['Key'].startswith('aws:'):
tags_dict[tag['Key']] = tag['Value']
return tags_dict
def ansible_dict_to_boto3_tag_list(tags_dict):
tags_list = []
for k, v in tags_dict.items():
tags_list.append({'Key': k, 'Value': v})
return tags_list
def handler(event, context):
tag_everything()
if __name__ == '__main__':
tag_everything()
@sansane123

This comment has been minimized.

Copy link

@sansane123 sansane123 commented Jan 29, 2018

Can you help me understand why are you using Ansible here ?

@danpritts

This comment has been minimized.

Copy link
Owner Author

@danpritts danpritts commented Feb 2, 2018

Ansible functions were in the original code I copied (see url in comments at top). At a guess, the original author knew about this method from other work and reused it.

@jester-vergara

This comment has been minimized.

Copy link

@jester-vergara jester-vergara commented Oct 22, 2019

image

image

Same script is working on another environment and on my own personal AWS account, just wondering what is causing this error on this environment. Thanks in advance for any help!

@danpritts

This comment has been minimized.

Copy link
Owner Author

@danpritts danpritts commented Oct 22, 2019

@jester-vergara

This comment has been minimized.

Copy link

@jester-vergara jester-vergara commented Oct 22, 2019

Not sure if access or cred, but is using predefined CI CD deployment to different environments. Work on the other one and only on this only env it is not working. Thanks for the feedback as well.

@YYashwanth

This comment has been minimized.

Copy link

@YYashwanth YYashwanth commented Oct 23, 2019

Looks like the key "Ebs" is not present in all BlockDeviceMappings as some might be ephemeral volumes.

I added a try catch around that to see what the error is
image
and the output of the exception block is
image

having a try-catch block around these kind of things helps in narrowing the error and continue the execution of the script.

@jester-vergara

This comment has been minimized.

Copy link

@jester-vergara jester-vergara commented Oct 23, 2019

Thanksssss a lot mate! Cheers, owe you one! @YYashwanth

@wwwsilvathiago

This comment has been minimized.

Copy link

@wwwsilvathiago wwwsilvathiago commented Nov 12, 2019

Thanks, Your script help me a lot...

@hellupline

This comment has been minimized.

Copy link

@hellupline hellupline commented Apr 16, 2020

I did some small improvements for this gist

#!/usr/bin/env python3

import logging
import os

import boto3


logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO"))

ec2 = boto3.client("ec2")
logger = logging.getLogger(__name__)


def tag_snapshots():
    snapshots = {
        snapshot["SnapshotId"]: snapshot
        for page in ec2.get_paginator("describe_snapshots").paginate(OwnerIds=["self"])
        for snapshot in page["Snapshots"]
    }
    for image in ec2.describe_images(Owners=["self"])["Images"]:
        numberofdevs = len(image["BlockDeviceMappings"])
        devnum = 0
        tags = extract_tags(image.get("Tags", []))
        for device in image["BlockDeviceMappings"]:
            if "Ebs" in device and "SnapshotId" in device["Ebs"]:
                devnum += 1
                name = f"AMI: {image['Name']} {device['DeviceName']} ({devnum}/{numberofdevs})"
                snapshot = snapshots[device["Ebs"]["SnapshotId"]]
                snapshot["Used"] = True
                new_tags = {
                    **extract_tags(snapshot.get("Tags", [])),
                    **tags,
                    "Name": name,
                    "ImageId": image["ImageId"],
                }
                if new_tags != tags:
                    logger.info("%s: Tags changed to %s", snapshot["SnapshotId"], repr(new_tags))
                    ec2.create_tags(Resources=[snapshot["SnapshotId"]], Tags=load_tags(new_tags))

    for snapshot in snapshots.values():
        if "Used" not in snapshot:
            tags = extract_tags(snapshot.get("Tags", []))
            name = tags.get("Name", snapshot["SnapshotId"])
            if not name.startswith("UNUSED"):
                logger.warning("%s: Unused", snapshot["SnapshotId"])
                tags["Name"] = f"UNUSED {name}"
                ec2.create_tags(Resources=[snapshot["SnapshotId"]], Tags=load_tags(tags))


def tag_volumes():
    volumes = {
        volume["VolumeId"]: volume
        for page in ec2.get_paginator("describe_volumes").paginate()
        for volume in page["Volumes"]
    }
    for page in ec2.get_paginator("describe_instances").paginate():
        for reservation in page["Reservations"]:
            for instance in reservation["Instances"]:
                numberofdevs = len(instance["BlockDeviceMappings"])
                devnum = 0
                tags = extract_tags(instance.get("Tags", []))
                for device in instance["BlockDeviceMappings"]:
                    if "Ebs" in device:
                        devnum += 1
                        name = f"{tags['Name']} {device['DeviceName']} ({devnum}/{numberofdevs})"
                        volume = volumes[device["Ebs"]["VolumeId"]]
                        volume["Used"] = True
                        new_tags = {
                            **extract_tags(volume.get("Tags", [])),
                            **tags,
                            "Name": name,
                        }
                        if new_tags != tags:
                            logger.info("%s: Tags changed to %s", volume["VolumeId"], repr(new_tags))
                            ec2.create_tags(Resources=[volume["VolumeId"]], Tags=load_tags(new_tags))

    for volume in volumes.values():
        if "Used" not in volume:
            tags = extract_tags(volume.get("Tags", []))
            name = tags.get("Name", volume["VolumeId"])
            if not name.startswith("UNUSED"):
                logger.warning("%s: Unused", volume["VolumeId"])
                tags["Name"] = f"UNUSED {name}"
                ec2.create_tags(Resources=[volume["VolumeId"]], Tags=load_tags(tags))


def tag_everything():
    tag_snapshots()
    tag_volumes()


def extract_tags(tags):
    return {tag["Key"]: tag["Value"] for tag in tags if not tag["Key"].startswith("aws:")}


def load_tags(tags):
    return [{"Key": k, "Value": v} for k, v in tags.items() if not k.startswith("aws:")]


if __name__ == "__main__":
    tag_everything()
@gannino

This comment has been minimized.

Copy link

@gannino gannino commented May 12, 2020

Hello Everyone and thanks for this tread,
I've done some further changes and tried to include as much as possible improvement to this and add the feature of deleting resources to the script as I needed to delete resources that are not used by AMI or Instances and have published a gist here if it comes useful to anyone: https://gist.github.com/gannino/17b41c316f874af5d4068c2674a54048

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.