Skip to content

Instantly share code, notes, and snippets.

@mlapida
Last active January 17, 2024 08:10
Show Gist options
  • Save mlapida/931c03cce1e9e43f147b to your computer and use it in GitHub Desktop.
Save mlapida/931c03cce1e9e43f147b to your computer and use it in GitHub Desktop.
A lambda function that will copy EC2 tags to all related Volumes and Network Interfaces. A full writeup can be found on my site https://empty.coffee/tagging-and-snapshotting-with-lambda/ - Thank you to the community for keeping this updated!
from __future__ import print_function
import json
import boto3
import logging
#setup simple logging for INFO
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
#define the connection region
ec2 = boto3.resource('ec2', region_name="us-west-2")
#Set this to True if you don't want the function to perform any actions
debugMode = False
def lambda_handler(event, context):
#List all EC2 instances
base = ec2.instances.all()
#loop through by running instances
for instance in base:
#Tag the Volumes
for vol in instance.volumes.all():
#print(vol.attachments[0]['Device'])
if debugMode == True:
print("[DEBUG] " + str(vol))
tag_cleanup(instance, vol.attachments[0]['Device'])
else:
tag = vol.create_tags(Tags=tag_cleanup(instance, vol.attachments[0]['Device']))
print("[INFO]: " + str(tag))
#Tag the Network Interfaces
for eni in instance.network_interfaces:
#print(eni.attachment['DeviceIndex'])
if debugMode == True:
print("[DEBUG] " + str(eni))
tag_cleanup(instance, "eth"+str(eni.attachment['DeviceIndex']))
else:
tag = eni.create_tags(Tags=tag_cleanup(instance, "eth"+str(eni.attachment['DeviceIndex'])))
print("[INFO]: " + str(tag))
#------------- Functions ------------------
#returns the type of configuration that was performed
def tag_cleanup(instance, detail):
tempTags=[]
v={}
for t in instance.tags:
#pull the name tag
if t['Key'] == 'Name':
v['Value'] = t['Value'] + " - " + str(detail)
v['Key'] = 'Name'
tempTags.append(v)
#Set the important tags that should be written here
elif t['Key'] == 'Application Owner':
print("[INFO]: Application Owner Tag " + str(t))
tempTags.append(t)
elif t['Key'] == 'Cost Center':
print("[INFO]: Cost Center Tag " + str(t))
tempTags.append(t)
elif t['Key'] == 'Date Created':
print("[INFO]: Date Created Tag " + str(t))
tempTags.append(t)
elif t['Key'] == 'Requestor':
print("[INFO]: Requestor Tag " + str(t))
tempTags.append(t)
elif t['Key'] == 'System Owner':
print("[INFO]: System Owner Tag " + str(t))
tempTags.append(t)
else:
print("[INFO]: Skip Tag - " + str(t))
print("[INFO] " + str(tempTags))
return(tempTags)
@gmr
Copy link

gmr commented Jan 8, 2019

And another variation adding elb and elbv2 eni tagging. I took out the lambda bits, but it'd be easy to add them back.

import boto3

COPYABLE = ["Service", "Environment", "Team", "Name"]


def ec2():
    print('Processing EC2 Instances')

    instances = boto3.resource('ec2').instances.all()
    for instance in instances:
        tags = [t for t in instance.tags or [] if t['Key'] in COPYABLE]
        if not tags:
            continue

        # Tag the EBS Volumes
        for vol in instance.volumes.all():
            print('Updating tags for {}'.format(vol.id))
            vol.create_tags(Tags=tags)

        # Tag the Elastic Network Interfaces
        for eni in instance.network_interfaces:
            print('Updating tags for {}'.format(eni.id))
            eni.create_tags(Tags=tags)


def elb():
    print('Processing ELB Instances')

    def filter(i):
        return (i.get('RequesterId') == 'amazon-elb' and
                i['Description'].startswith('ELB') and
                '/' not in i['Description'])

    tags = _get_elb_tags('elb')
    for interface in _network_interfaces(filter):
        name = interface['Description'].split(' ')[1]
        if name not in tags:
            continue
        _tag_network_interface(interface['NetworkInterfaceId'], tags[name])


def elbv2():
    print('Processing ELBv2 Instances')

    def filter(i):
        return (i.get('RequesterId') == 'amazon-elb' and
                i['Description'].startswith('ELB') and
                '/' in i['Description'])

    tags = _get_elb_tags('elbv2')
    for interface in _network_interfaces(filter):
        name = interface['Description'].split('/')[1]
        if name not in tags:
            continue
        _tag_network_interface(interface['NetworkInterfaceId'], tags[name])


def _get_elb_tags(name='elb'):
    if name == 'elb':
        page_name = 'LoadBalancerDescriptions'
        key = 'LoadBalancerName'
        kwname = 'LoadBalancerNames'
    elif name == 'elbv2':
        page_name = 'LoadBalancers'
        key = 'LoadBalancerArn'
        kwname = 'ResourceArns'
    else:
        raise ValueError('Invalid name: {}'.format(name))

    tags = {}
    client = boto3.client(name)
    paginator = client.get_paginator('describe_load_balancers')
    for page in paginator.paginate():
        for lb in page[page_name]:
            response = client.describe_tags(**{kwname: [lb[key]]})
            lb_tags = [item for sublist in
                       [r.get('Tags', []) for r in response['TagDescriptions']]
                       for item in sublist]
            tags[lb['LoadBalancerName']] = [t for t in lb_tags if
                                            t['Key'] in COPYABLE]
            tags[lb['LoadBalancerName']].append(
                {'Key': 'Name', 'Value': lb['LoadBalancerName']})
    return tags


def _network_interfaces(filter=None):
    client = boto3.client('ec2')
    paginator = client.get_paginator('describe_network_interfaces')
    for page in paginator.paginate():
        for interface in page['NetworkInterfaces']:
            if filter and not filter(interface):
                continue
            yield interface


def _tag_network_interface(eni_id, tags):
    print('Updating tags for {}'.format(eni_id))
    ec2 = boto3.resource('ec2')
    eni = ec2.NetworkInterface(eni_id)
    eni.create_tags(Tags=tags)


def main():
    ec2()
    elb()
    elbv2()


if __name__ == '__main__':
    main()

@jutler
Copy link

jutler commented Apr 9, 2019

@gmr this is awesome, just the thing i was looking for. I'm very new to python so still learning. Just 2 things i wanted to ask. Is there a way to include Elastic IP's in this code? Also how could you implement a dryrun in this so we can see what changes would me made without actually making them in the first instance?

@tomekklas
Copy link

tomekklas commented Jun 3, 2019

I packaged the script from @gmr into self-contained CloudFormation for ease of use.

https://gist.github.com/tomekklas/f11172a0a2cdd5888c66ddb7f6d6c214

@retpolanne
Copy link

retpolanne commented Jun 28, 2019

Even simpler. Thanks for your example.

import boto3

tags_to_use = ['SomeTag']

def lambda_handler(event, context):
    instances = boto3.resource('ec2').instances.all()
    for instance in instances:
        tags = instance.tags
        to_tag = [t for t in tags if t['Key'] in tags_to_use]
        for vol in instance.volumes.all():
            print(f"Tagging volume {vol.id} from instance {instance.id}")
            vol.create_tags(Tags=to_tag)

You can attach a CloudWatch event that listen to new running instances. This way, you can run your script when a new instance starts.

@sriharipakalapati
Copy link

@n2taylor - thanks, this is great. but

Can I add a condition to check if the volume has any existing tag's before assigning the instance tags?
I don't want to take out existing tags on any of the EBS volumes, just want to assign ec2 tags incase if the volume doesn't has any.

thanks

@jtwp
Copy link

jtwp commented Aug 4, 2020

Hi,

Is it possible to extend the copying of tags to Snapshots?

@jamesduffy
Copy link

jamesduffy commented Feb 10, 2021

@n2taylor - thanks, this is great. but

Can I add a condition to check if the volume has any existing tag's before assigning the instance tags?
I don't want to take out existing tags on any of the EBS volumes, just want to assign ec2 tags incase if the volume doesn't has any.

thanks

I updated my copy of this with this ec2 function, not sure if this works for you. I was specifically looking to avoid overwriting the Name tag as that was set by a persistent volume claim in EKS and didn't want to lose the relation.

COPYABLE = [
    "Name",
    "role",
    "environment",
    "cost_category",
]

NO_NAME_COPYABLE = [
    "role",
    "environment",
    "cost_category",
]

def ec2():
    print('Processing EC2 Instances')

    instances = boto3.resource('ec2').instances.all()
    for instance in instances:
        tags = [t for t in instance.tags or [] if t['Key'] in COPYABLE]
        tags_no_name = [t for t in instance.tags or [] if t['Key'] in NO_NAME_COPYABLE]

        if not tags:
            continue

        # Tag the EBS Volumes
        for vol in instance.volumes.all():
            print(f'Updating tags for {vol.id}')

            # Don't replace existing Name for volumes
            existing_volume_tag_keys = [t['Key'] for t in vol.tags or []]
            if "Name" in existing_volume_tag_keys:
                vol.create_tags(Tags=tags_no_name)
            else:
                vol.create_tags(Tags=tags)

        # Tag the Elastic Network Interfaces
        for eni in instance.network_interfaces:
            print('Updating tags for {}'.format(eni.id))
            eni.create_tags(Tags=tags)

@jamesduffy
Copy link

I added the ability to tag network interfaces for ElastiCache and Elasticsearch.

def cache():
    print('Processing ElasticCache Instances')

    def filter(i):
        return (i.get('RequesterId') == 'amazon-elasticache' and
                i['Description'].startswith('ElastiCache'))

    tags = _get_elasticcache_tags()
    pprint(tags)
    for interface in _network_interfaces(filter):
        name = interface['Description'].split(' ')[1]
        print(name)
        if name not in tags:
            continue
        _tag_network_interface(interface['NetworkInterfaceId'], tags[name])


def _get_elasticcache_tags():
    tags = {}
    client = boto3.client('elasticache')
    clusters = client.describe_cache_clusters()
    for cluster in clusters['CacheClusters']:
        name = cluster['CacheClusterId']
        response = client.list_tags_for_resource(ResourceName=cluster['ARN'])
        cache = response.get('TagList', [])
        tags[name] = [t for t in cache if t['Key'] in COPYABLE]
        tags[name].append({'Key': 'Name', 'Value': name})
    return tags


def es():
    print('Processing Elasticsearch clusters')

    def filter(i):
        return (i.get('RequesterId') == 'amazon-elasticsearch' and
                i['Description'].startswith('ES'))

    tags = _get_es_tags()
    for interface in _network_interfaces(filter):
        name = interface['Description'].split(' ')[1]
        if name not in tags:
            continue
        _tag_network_interface(interface['NetworkInterfaceId'], tags[name])


def _get_es_tags():
    tags = {}
    client = boto3.client('es')
    domain_names = client.list_domain_names()
    for domain in domain_names['DomainNames']:
        domain_name = domain['DomainName']
        domain_details = client.describe_elasticsearch_domain(DomainName=domain_name)
        response = client.list_tags(ARN=domain_details['DomainStatus']['ARN'])
        es = response.get('TagList', [])
        tags[domain_name] = [t for t in es if t['Key'] in COPYABLE]
        tags[domain_name].append({'Key': 'Name', 'Value': domain_name})
    return tags

@jamesduffy
Copy link

And EKS:

def eks():
    print('Processing EKS Clusters')

    def filter(i):
        return (i['Description'].startswith('Amazon EKS'))

    tags = _get_eks_tags()
    for interface in _network_interfaces(filter):
        name = interface['Description'].split(' ')[2]
        if name not in tags:
            continue
        _tag_network_interface(interface['NetworkInterfaceId'], tags[name])


def _get_eks_tags():
    tags = {}
    client = boto3.client('eks')
    paginator = client.get_paginator('list_clusters')
    clusters = [c for p in paginator.paginate() for c in p['clusters']]

    for cluster in clusters:
        response = client.describe_cluster(name=cluster)
        cluster_details = response['cluster']
        cluster_tags = cluster_details.get('tags', [])

        tags[cluster] = [{'Key': k, 'Value': v} for k, v in cluster_tags.items() if k in COPYABLE]

    return tags

@Rodrigrogu
Copy link

i got this:

import boto3

def lambda_handler(event, context):
is_test = context.function_name == 'test' # this value is injected by SAM local
instances = boto3.resource('ec2').instances.all()

volume = ["Volume"]
eni = ["ENI"]
copyable_tag_keys = ["Hardening", "appname", "appdescription", "Owner", "env", "Name",]
copyable_tag_keysv = ["Hardening", "appname", "appdescription", "Owner", "env", "Name",]
copyable_tag_keyseni = ["Hardening", "appname", "appdescription", "Owner", "env", "Name",]

for instance in instances:
    copyable_tags = [t for t in instance.tags
                     if t["Key"] in copyable_tag_keys] if instance.tags else []
    if not copyable_tags:
        continue

    # Tag the EBS Volumes
    print(f"{instance.instance_id}: {instance.tags}")
    for vol in instance.volumes.all():
        print(f"Volume{vol.attachments[0]['Device']}: %volume,{copyable_tags}")
        if not is_test:
            vol.create_tags(Tags=copyable_tags)

    # Tag the Elastic Network Interfaces
    for eni in instance.network_interfaces:
        print(f"eth{str(eni.attachment['DeviceIndex'])}: %eni,{copyable_tags}")
        if not is_test:
            eni.create_tags(Tags=copyable_tags) 
            
            
       ------------

But actually i need to exclude some ec2 instances and volumes from tagging, how can i do that?

@Rodrigrogu
Copy link

@jamesduffy i want to exclude some ec2 instances and volumes from tagging, how can i do that?

@jamesduffy
Copy link

@jamesduffy i want to exclude some ec2 instances and volumes from tagging, how can i do that?

I don't know if negative filters work in boto3. I haven't tried it, but I would start there. I would try to add a tag to the instances/resources that tell the script not to copy the tags. For example my ideal setup I would create a copy-tags-ignore=true on anything I wanted to ignore. Then if I can't do a negative filter to replace boto3.resource('ec2').instances.all(). If I can't use a negative filter than if you can check for that tag in the for loop and check if it exists and continue if it is exists and set to true.

Not ideal instead of using a tag named something like copy-tags-ignore you could create a list of resource names you don't want the script to copy and in the loop do a check like if instance.tags['Name'] in do_not_copy continue instead of copying.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment