Skip to content

Instantly share code, notes, and snippets.

@mlapida
Last active January 17, 2024 08:10
Show Gist options
  • Save mlapida/931c03cce1e9e43f147b to your computer and use it in GitHub Desktop.
Save mlapida/931c03cce1e9e43f147b to your computer and use it in GitHub Desktop.
A lambda function that will copy EC2 tags to all related Volumes and Network Interfaces. A full writeup can be found on my site https://empty.coffee/tagging-and-snapshotting-with-lambda/ - Thank you to the community for keeping this updated!
from __future__ import print_function
import json
import boto3
import logging
#setup simple logging for INFO
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
#define the connection region
ec2 = boto3.resource('ec2', region_name="us-west-2")
#Set this to True if you don't want the function to perform any actions
debugMode = False
def lambda_handler(event, context):
#List all EC2 instances
base = ec2.instances.all()
#loop through by running instances
for instance in base:
#Tag the Volumes
for vol in instance.volumes.all():
#print(vol.attachments[0]['Device'])
if debugMode == True:
print("[DEBUG] " + str(vol))
tag_cleanup(instance, vol.attachments[0]['Device'])
else:
tag = vol.create_tags(Tags=tag_cleanup(instance, vol.attachments[0]['Device']))
print("[INFO]: " + str(tag))
#Tag the Network Interfaces
for eni in instance.network_interfaces:
#print(eni.attachment['DeviceIndex'])
if debugMode == True:
print("[DEBUG] " + str(eni))
tag_cleanup(instance, "eth"+str(eni.attachment['DeviceIndex']))
else:
tag = eni.create_tags(Tags=tag_cleanup(instance, "eth"+str(eni.attachment['DeviceIndex'])))
print("[INFO]: " + str(tag))
#------------- Functions ------------------
#returns the type of configuration that was performed
def tag_cleanup(instance, detail):
tempTags=[]
v={}
for t in instance.tags:
#pull the name tag
if t['Key'] == 'Name':
v['Value'] = t['Value'] + " - " + str(detail)
v['Key'] = 'Name'
tempTags.append(v)
#Set the important tags that should be written here
elif t['Key'] == 'Application Owner':
print("[INFO]: Application Owner Tag " + str(t))
tempTags.append(t)
elif t['Key'] == 'Cost Center':
print("[INFO]: Cost Center Tag " + str(t))
tempTags.append(t)
elif t['Key'] == 'Date Created':
print("[INFO]: Date Created Tag " + str(t))
tempTags.append(t)
elif t['Key'] == 'Requestor':
print("[INFO]: Requestor Tag " + str(t))
tempTags.append(t)
elif t['Key'] == 'System Owner':
print("[INFO]: System Owner Tag " + str(t))
tempTags.append(t)
else:
print("[INFO]: Skip Tag - " + str(t))
print("[INFO] " + str(tempTags))
return(tempTags)
@sriharipakalapati
Copy link

@n2taylor - thanks, this is great. but

Can I add a condition to check if the volume has any existing tag's before assigning the instance tags?
I don't want to take out existing tags on any of the EBS volumes, just want to assign ec2 tags incase if the volume doesn't has any.

thanks

@jtwp
Copy link

jtwp commented Aug 4, 2020

Hi,

Is it possible to extend the copying of tags to Snapshots?

@jamesduffy
Copy link

jamesduffy commented Feb 10, 2021

@n2taylor - thanks, this is great. but

Can I add a condition to check if the volume has any existing tag's before assigning the instance tags?
I don't want to take out existing tags on any of the EBS volumes, just want to assign ec2 tags incase if the volume doesn't has any.

thanks

I updated my copy of this with this ec2 function, not sure if this works for you. I was specifically looking to avoid overwriting the Name tag as that was set by a persistent volume claim in EKS and didn't want to lose the relation.

COPYABLE = [
    "Name",
    "role",
    "environment",
    "cost_category",
]

NO_NAME_COPYABLE = [
    "role",
    "environment",
    "cost_category",
]

def ec2():
    print('Processing EC2 Instances')

    instances = boto3.resource('ec2').instances.all()
    for instance in instances:
        tags = [t for t in instance.tags or [] if t['Key'] in COPYABLE]
        tags_no_name = [t for t in instance.tags or [] if t['Key'] in NO_NAME_COPYABLE]

        if not tags:
            continue

        # Tag the EBS Volumes
        for vol in instance.volumes.all():
            print(f'Updating tags for {vol.id}')

            # Don't replace existing Name for volumes
            existing_volume_tag_keys = [t['Key'] for t in vol.tags or []]
            if "Name" in existing_volume_tag_keys:
                vol.create_tags(Tags=tags_no_name)
            else:
                vol.create_tags(Tags=tags)

        # Tag the Elastic Network Interfaces
        for eni in instance.network_interfaces:
            print('Updating tags for {}'.format(eni.id))
            eni.create_tags(Tags=tags)

@jamesduffy
Copy link

I added the ability to tag network interfaces for ElastiCache and Elasticsearch.

def cache():
    print('Processing ElasticCache Instances')

    def filter(i):
        return (i.get('RequesterId') == 'amazon-elasticache' and
                i['Description'].startswith('ElastiCache'))

    tags = _get_elasticcache_tags()
    pprint(tags)
    for interface in _network_interfaces(filter):
        name = interface['Description'].split(' ')[1]
        print(name)
        if name not in tags:
            continue
        _tag_network_interface(interface['NetworkInterfaceId'], tags[name])


def _get_elasticcache_tags():
    tags = {}
    client = boto3.client('elasticache')
    clusters = client.describe_cache_clusters()
    for cluster in clusters['CacheClusters']:
        name = cluster['CacheClusterId']
        response = client.list_tags_for_resource(ResourceName=cluster['ARN'])
        cache = response.get('TagList', [])
        tags[name] = [t for t in cache if t['Key'] in COPYABLE]
        tags[name].append({'Key': 'Name', 'Value': name})
    return tags


def es():
    print('Processing Elasticsearch clusters')

    def filter(i):
        return (i.get('RequesterId') == 'amazon-elasticsearch' and
                i['Description'].startswith('ES'))

    tags = _get_es_tags()
    for interface in _network_interfaces(filter):
        name = interface['Description'].split(' ')[1]
        if name not in tags:
            continue
        _tag_network_interface(interface['NetworkInterfaceId'], tags[name])


def _get_es_tags():
    tags = {}
    client = boto3.client('es')
    domain_names = client.list_domain_names()
    for domain in domain_names['DomainNames']:
        domain_name = domain['DomainName']
        domain_details = client.describe_elasticsearch_domain(DomainName=domain_name)
        response = client.list_tags(ARN=domain_details['DomainStatus']['ARN'])
        es = response.get('TagList', [])
        tags[domain_name] = [t for t in es if t['Key'] in COPYABLE]
        tags[domain_name].append({'Key': 'Name', 'Value': domain_name})
    return tags

@jamesduffy
Copy link

And EKS:

def eks():
    print('Processing EKS Clusters')

    def filter(i):
        return (i['Description'].startswith('Amazon EKS'))

    tags = _get_eks_tags()
    for interface in _network_interfaces(filter):
        name = interface['Description'].split(' ')[2]
        if name not in tags:
            continue
        _tag_network_interface(interface['NetworkInterfaceId'], tags[name])


def _get_eks_tags():
    tags = {}
    client = boto3.client('eks')
    paginator = client.get_paginator('list_clusters')
    clusters = [c for p in paginator.paginate() for c in p['clusters']]

    for cluster in clusters:
        response = client.describe_cluster(name=cluster)
        cluster_details = response['cluster']
        cluster_tags = cluster_details.get('tags', [])

        tags[cluster] = [{'Key': k, 'Value': v} for k, v in cluster_tags.items() if k in COPYABLE]

    return tags

@Rodrigrogu
Copy link

i got this:

import boto3

def lambda_handler(event, context):
is_test = context.function_name == 'test' # this value is injected by SAM local
instances = boto3.resource('ec2').instances.all()

volume = ["Volume"]
eni = ["ENI"]
copyable_tag_keys = ["Hardening", "appname", "appdescription", "Owner", "env", "Name",]
copyable_tag_keysv = ["Hardening", "appname", "appdescription", "Owner", "env", "Name",]
copyable_tag_keyseni = ["Hardening", "appname", "appdescription", "Owner", "env", "Name",]

for instance in instances:
    copyable_tags = [t for t in instance.tags
                     if t["Key"] in copyable_tag_keys] if instance.tags else []
    if not copyable_tags:
        continue

    # Tag the EBS Volumes
    print(f"{instance.instance_id}: {instance.tags}")
    for vol in instance.volumes.all():
        print(f"Volume{vol.attachments[0]['Device']}: %volume,{copyable_tags}")
        if not is_test:
            vol.create_tags(Tags=copyable_tags)

    # Tag the Elastic Network Interfaces
    for eni in instance.network_interfaces:
        print(f"eth{str(eni.attachment['DeviceIndex'])}: %eni,{copyable_tags}")
        if not is_test:
            eni.create_tags(Tags=copyable_tags) 
            
            
       ------------

But actually i need to exclude some ec2 instances and volumes from tagging, how can i do that?

@Rodrigrogu
Copy link

@jamesduffy i want to exclude some ec2 instances and volumes from tagging, how can i do that?

@jamesduffy
Copy link

@jamesduffy i want to exclude some ec2 instances and volumes from tagging, how can i do that?

I don't know if negative filters work in boto3. I haven't tried it, but I would start there. I would try to add a tag to the instances/resources that tell the script not to copy the tags. For example my ideal setup I would create a copy-tags-ignore=true on anything I wanted to ignore. Then if I can't do a negative filter to replace boto3.resource('ec2').instances.all(). If I can't use a negative filter than if you can check for that tag in the for loop and check if it exists and continue if it is exists and set to true.

Not ideal instead of using a tag named something like copy-tags-ignore you could create a list of resource names you don't want the script to copy and in the loop do a check like if instance.tags['Name'] in do_not_copy continue instead of copying.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment