Skip to content

Instantly share code, notes, and snippets.

@Eyjafjallajokull
Last active December 25, 2023 02:53
Show Gist options
  • Star 75 You must be signed in to star a gist
  • Fork 58 You must be signed in to fork a gist
  • Save Eyjafjallajokull/4e917414cfb191391f9e51f6a8c3e46a to your computer and use it in GitHub Desktop.
Save Eyjafjallajokull/4e917414cfb191391f9e51f6a8c3e46a to your computer and use it in GitHub Desktop.
AWS EBS - Find unused snapshots

This script can help you find and remove unused AWS snapshots and volumes.

There is hardcoded list of regions that it searches, adjust the value to suit your needs.

Use snapshot.py snapshot-report to generate report.csv containing information about all snapshots.

snapshot.py snapshot-cleanup lets you interactively delete snapshot if it finds it is referencing unexisting resources.

./snapshots.py --help
Usage: snapshots.py [OPTIONS] COMMAND [ARGS]...

  Helper commands for Snapshots management.

Options:
  --help  Show this message and exit.

Commands:
  snapshot-cleanup  Find and delete unreferenced snapshots.
  snapshot-delete   Delete single snapshot by id.
  snapshot-report   Find unreferenced snapshots.
  volume-cleanup    Find and delete unused volumes.
#!/usr/local/bin/python3
import csv
import re
from collections import OrderedDict
from pprint import pprint
import boto3
import click
from botocore.exceptions import ClientError
regions = ['us-west-1', 'eu-central-1']
ec2 = None
exists_icon = '✅'
not_exists_icon = '❌'
@click.group()
def cli():
'''
Helper commands for Snapshots management.
'''
pass
@cli.command()
def snapshot_report():
'''
Find unreferenced snapshots.
'''
global ec2
with open('report.csv', 'w') as csv_file:
csv_writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
csv_writer.writerow([
'id',
'volume_id',
'volume_exists',
'ami_id',
'ami_exists',
'instance_id',
'instance_exists',
'size',
'start_time',
'description' ])
for region in regions:
ec2 = boto3.client('ec2', region_name=region)
for snapshot in get_snapshots():
csv_writer.writerow([
snapshot['id'],
snapshot['volume_id'],
snapshot['volume_exists'],
snapshot['ami_id'],
snapshot['ami_exists'],
snapshot['instance_id'],
snapshot['instance_exists'],
str(snapshot['size']) + 'gb',
str(snapshot['start_time']),
snapshot['description']])
@cli.command()
def snapshot_cleanup():
'''
Find and delete unreferenced snapshots.
'''
global ec2
print('{:22} {:23} {:23} {:23} {:>7} {:25} {:30}'.format('snapshot id', 'volume id', 'ami id',
'instance id', 'size', 'start time', 'description'))
for region in regions:
ec2 = boto3.client('ec2', region_name=region)
print('region={}'.format(region))
for snapshot in get_snapshots():
volume_exists = exists_icon if snapshot['volume_exists'] else not_exists_icon
ami_exists = exists_icon if snapshot['ami_exists'] else not_exists_icon
instance_exists = exists_icon if snapshot['instance_exists'] else not_exists_icon
print('{:22} {:22} {:22} {:22} {:>7} {:25} {:30}'.format(
snapshot['id'],
snapshot['volume_id'] + volume_exists,
snapshot['ami_id'] + ami_exists,
snapshot['instance_id'] + instance_exists,
str(snapshot['size']) + 'gb',
str(snapshot['start_time']),
snapshot['description']
))
if not snapshot['volume_exists'] and not snapshot['ami_exists'] and not snapshot['instance_exists'] and click.confirm('Delete?', default=True):
ec2.delete_snapshot(SnapshotId=snapshot['id'])
@cli.command()
def volume_cleanup():
'''
Find and delete unused volumes.
'''
global ec2
print('{:23} {:20} {:>7} {:10} {:23}'.format(
'volume id', 'status', 'size', 'created', 'snapshot id'))
for region in regions:
ec2 = boto3.client('ec2', region_name=region)
print('region={}'.format(region))
for volume in get_available_volumes():
snapshot_exists = exists_icon if volume['snapshot_exists'] else not_exists_icon
print('{:23} {:20} {:>7} {:10} {:22}'.format(
volume['id'],
volume['status'],
str(volume['size']) + 'gb',
volume['create_time'].strftime('%Y-%m-%d'),
volume['snapshot_id'] + snapshot_exists
))
if not volume['snapshot_exists']:
print('Tags:')
print(' '+('\n '.join(['{}={}'.format(click.style(key, fg='blue'), tag)
for key, tag in volume['tags'].items()])))
if click.confirm('Delete?', default=True):
ec2.delete_volume(VolumeId=volume['id'])
@cli.command()
@click.argument('snapshot_id')
def snapshot_delete(snapshot_id):
'''
Delete single snapshot by id.
'''
try:
ec2.delete_snapshot(SnapshotId=snapshot_id)
print('Deleted ' + snapshot_id)
except ClientError as e:
print('Failed to delete ' + snapshot_id)
print(e)
def get_snapshots():
'''
Get all snapshots.
'''
for snapshot in ec2.describe_snapshots(OwnerIds=['self'])['Snapshots']:
instance_id, image_id = parse_description(snapshot['Description'])
yield {
'id': snapshot['SnapshotId'],
'description': snapshot['Description'],
'start_time': snapshot['StartTime'],
'size': snapshot['VolumeSize'],
'volume_id': snapshot['VolumeId'],
'volume_exists': volume_exists(snapshot['VolumeId']),
'instance_id': instance_id,
'instance_exists': instance_exists(instance_id),
'ami_id': image_id,
'ami_exists': image_exists(image_id),
}
def get_available_volumes():
'''
Get all volumes in 'available' state. (Volumes not attached to any instance)
'''
for volume in ec2.describe_volumes(Filters=[{'Name': 'status', 'Values': ['available']}])['Volumes']:
yield {
'id': volume['VolumeId'],
'create_time': volume['CreateTime'],
'status': volume['State'],
'size': volume['Size'],
'snapshot_id': volume['SnapshotId'],
'snapshot_exists': str(snapshot_exists(volume['SnapshotId'])),
'tags': OrderedDict(sorted([(tag['Key'], tag['Value']) for tag in volume['Tags']])),
}
def snapshot_exists(snapshot_id):
if not snapshot_id:
return ''
try:
ec2.describe_snapshots(SnapshotIds=[snapshot_id])
return True
except ClientError:
return False
def volume_exists(volume_id):
if not volume_id:
return False
try:
ec2.describe_volumes(VolumeIds=[volume_id])
return True
except ClientError:
return False
def instance_exists(instance_id):
if not instance_id:
return ''
try:
return len(ec2.describe_instances(InstanceIds=[instance_id])['Reservations']) != 0
except ClientError:
return False
def image_exists(image_id):
if not image_id:
return ''
try:
return len(ec2.describe_images(ImageIds=[image_id])['Images']) != 0
except ClientError:
return False
def parse_description(description):
regex = r"^Created by CreateImage\((.*?)\) for (.*?) "
matches = re.finditer(regex, description, re.MULTILINE)
for matchNum, match in enumerate(matches):
return match.groups()
return '', ''
if __name__ == '__main__':
cli()
@nicbor
Copy link

nicbor commented Jul 18, 2017

Thanks for the warning @jordanmance. Forked to include your changes.

@niknkson
Copy link

Thanks for this lovely script. it really saves lots of human efforts. Can you help me to modify this python script like old snapshots queried first. As I am totally new to python.

@eran658
Copy link

eran658 commented Aug 6, 2018

This is very useful, Thank You!
What would be really great is to include also the following:
Instance Name, Volume Name, Image Name

Currently the IDs alone are out of context when you deal with a massive old images cleanup backlog work and without the name it is very difficult to understand what you are viewing.
Any idea how to improve the code to also include the Names columns for those 3 objects?

Thanks!

@eran658
Copy link

eran658 commented Aug 6, 2018

I was able to also add the names by these 3 functions:

def get_instance_name(instance_id):
try:
instance=ec2_client.describe_instances(InstanceIds=[instance_id])
for tag in instance.tags:
if tag['Key'] == 'Name':
return tag['Value']
except ClientError:
return 'Instance does not exist'

def get_volume_name(volume_id):
try:
volume = ec2_client.describe_volumes(VolumeIds=[volume_id])
for tag in volume.tags:
if tag['Key'] == 'Name':
return tag['Value']
except ClientError:
return 'Volume does not exist'

def get_image_name(image_id):
try:
image = ec2_client.describe_images(ImageIds=[image_id,])
return image['Images'][0]['Name']
except ClientError:
return 'AMI does not exist'

@hiripitiyage
Copy link

I am new to scripting. I tried this script on our test environment and got the following error. I run this script on a W10 machine configuring AWS CLI.
Traceback (most recent call last):
File "C:\AWS\snapshots.py", line 72, in
main()
File "C:\AWS\snapshots.py", line 56, in main
for snap in get_snapshots():
File "C:\AWS\snapshots.py", line 9, in get_snapshots
return ec2.describe_snapshots(OwnerIds=['self'])['Snapshots']
File "C:\Users\roshan.hiripitiyaga\AppData\Local\Programs\Python\Python37\lib\site-packages\botocore\client.py", line 314, in _api_call
return self._make_api_call(operation_name, kwargs)
File "C:\Users\roshan.hiripitiyaga\AppData\Local\Programs\Python\Python37\lib\site-packages\botocore\client.py", line 612, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (AuthFailure) when calling the DescribeSnapshots operation: AWS was not able to validate the provided access credentials

@torrencer
Copy link

torrencer commented Oct 9, 2018

How could I convert this to PowerShell?

@uolter
Copy link

uolter commented Nov 14, 2018

Could it be the csv file name should be: report.csv instead of raport.csv :-)

@ramuvvy
Copy link

ramuvvy commented Jun 30, 2019

Hi,

I have lambda function to get Unused EIPs and EBS volumes with pricing details.. I wanted to modify/add the code to get snapshots details as well. Can anyone help me here please? Thanks in advance.

import json
import boto3
import os
import argparse
import inspect

functions = { 'ec2' : [
               {'name' : 'get_ec2_instances', 'title' : 'Running EC2 Instances' },
               {'name' : 'get_ebs_volumes', 'title' : 'Un-attached EBS Volumes' },
               {'name' : 'get_eip', 'title' : 'Un-attached EIPs' }
              ]
            }

def html_header_2(title):
  return "<h2>" + title + "</h2>\n"
def html_header_3(title):
  return "<h3>" + title + "</h3>\n"
def html_para(para):
  return "<p>" + para + "</p>\n"
def html_table(columns, data):
  c=1
  #Add header row
  table="<table border=1 CELLPADDING=1 CELLSPACING=0 width=\"86%\">"
  table += "<tr bgcolor=\"#0099cc\">"
  for column in columns:
    table += "<th><font color=\"#fff\">" + column + "</font></th>"
  table += "</tr>"
  #Add data rows
  for row in data:
    table += "<tr>"
    for item in row:
      table += "<td>" + str(item) + "</td>"
    table += "</tr>"
  table += "</table>"
  return table

#TODO Add paginations for all functions
def price_cache_update(itype, region, cost):
  if itype in price_cache:
    if region in price_cache[itype]:
      return
    else:
      price_cache[itype][region] = cost
  else:
    price_cache[itype] = { region : cost }

def saving_update(type, cost):
  if type in savings:
    savings[type] += cost
  else:
    savings[type] = cost

def clear_cache():
  price_cache = {}
  #Lambda execution caching global data
  savings['ec2'] = 0
  savings['ebs'] = 0
  savings['eip'] = 0

def get_ebs_price(pricing, region, vtype, vsize):
  #check if price is there in cache
  if vtype in price_cache and region in price_cache[vtype]:
    return price_cache[vtype][region]
  
  #print "VolumeType=%s location=%s" %(vtype, RegionMapping[region])
  price_detail = pricing.get_products(ServiceCode='AmazonEC2',
    Filters=[
    {'Type': 'TERM_MATCH', 'Field': 'productFamily', 'Value': 'Storage'},
    {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': RegionMapping[region]}
  ])
  #print json.dumps(price_detail, indent=2)
  UsageData = {}
  for item in price_detail['PriceList']:
    data = json.loads(item)
    if 'EBS:VolumeUsage.' + vtype not in data['product']['attributes']['usagetype']:
      continue
    else:
      UsageData = data
  if not UsageData:
    return ''
  #print json.dumps(UsageData, indent=2)
  sku_code = UsageData['product']['sku']
  sku_offer_term = sku_code + '.' + OfferTermCode
  values = UsageData['terms']['OnDemand'][sku_offer_term]['priceDimensions']
  for key in values:
    if 'rateCode' in values[key]:
      ratecode = ['rateCode']
      cost = float(values[key]['pricePerUnit']['USD'])
      volume_cost = round((cost * vsize), 2)
      price_cache_update(vtype, region, volume_cost)
  return volume_cost

    
def get_ec2_price(pricing, region, itype):
  #check if price is there in cache
  if itype in price_cache and region in price_cache[itype]:
    return price_cache[itype][region]
  
  #print "InstanceType=%s location=%s" %(itype, RegionMapping[region])
  price_detail = pricing.get_products(ServiceCode='AmazonEC2',
    Filters=[
    {'Type': 'TERM_MATCH', 'Field': 'instanceType', 'Value': itype},
    {'Type': 'TERM_MATCH', 'Field': 'operatingSystem', 'Value': 'Linux'},
    {'Type': 'TERM_MATCH', 'Field': 'tenancy', 'Value': 'Shared'},
    {'Type': 'TERM_MATCH', 'Field': 'preInstalledSw', 'Value': 'NA'},
    {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': RegionMapping[region]}
  ])
  #print json.dumps(price_detail, indent=2)
  UsageData = {}
  for item in price_detail['PriceList']:
    data = json.loads(item)
    if 'BoxUsage' not in data['product']['attributes']['usagetype']:
      continue
    else:
      UsageData = data
  if not UsageData:
    return ''
  #print json.dumps(UsageData, indent=2)
  sku_code = UsageData['product']['sku']
  sku_offer_term = sku_code + '.' + OfferTermCode
  values = UsageData['terms']['OnDemand'][sku_offer_term]['priceDimensions']
  for key in values:
    if 'rateCode' in values[key]:
      ratecode = ['rateCode']
      cost = float(values[key]['pricePerUnit']['USD'])
      instance_cost = round((cost * 24 * 30.5), 2)
      price_cache_update(itype, region, instance_cost)
  return instance_cost
 
def get_eip_price(pricing, region, etype):
  #check if price is there in cache
  if etype in price_cache and region in price_cache[etype]:
    return price_cache[etype][region]
  
  #print "IPType=%s location=%s" %(etype, RegionMapping[region])
  price_detail = pricing.get_products(ServiceCode='AmazonEC2',
    Filters=[
    {'Type': 'TERM_MATCH', 'Field': 'productFamily', 'Value': 'IP Address'},
    {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': RegionMapping[region]}
  ])
  #print json.dumps(price_detail, indent=2)
  UsageData = {}
  for item in price_detail['PriceList']:
    data = json.loads(item)
    if 'ElasticIP:AdditionalAddress' not in data['product']['attributes']['usagetype']:
      continue
    else:
      UsageData = data
  if not UsageData:
    return ''
  #print json.dumps(UsageData, indent=2)
  sku_code = UsageData['product']['sku']
  sku_offer_term = sku_code + '.' + OfferTermCode
  values = UsageData['terms']['OnDemand'][sku_offer_term]['priceDimensions']
  for key in values:
    if 'rateCode' in values[key]:
      ratecode = ['rateCode']
      cost = float(values[key]['pricePerUnit']['USD'])
      eip_cost = round((cost * 24 * 30.5), 2)
      price_cache_update(etype, region, eip_cost)
  return eip_cost
	
def get_ec2_instances(service, pricing, regions, body):
  header = ['Sr.','Region','InstanceId','Name', 'State','InstanceType','LaunchTime', 'Monthly cost']
  table = []
  c=1
  found = False
  for region in regions:
    ec2client = boto3.client('ec2', region_name=region)
    data = ec2client.describe_instances(Filters=[{'Name': 'instance-state-name', 'Values': ['running']}])
    if not data or not data['Reservations'] or not data['Reservations'][0]['Instances']:
      continue
	  
    for i in data['Reservations'][0]['Instances']:
      iname = ''
      if 'Tags' in i:
        tag = [x for x in i['Tags'] if x['Key'] == 'Name']
        iname = tag[0]['Value'] if tag[0] else ''
      try:
        instance_cost = get_ec2_price(pricing, region, i['InstanceType'])
        #print "Calling saving_update for %s %s %s" %(region, i['InstanceId'], i['InstanceType'])
        saving_update('ec2', instance_cost)
        instance_cost_dollar = '$ ' + str(instance_cost) if instance_cost else ''
      except Exception,e:
        print "Could not determine cost: %s" %(str(e))
        instance_cost_dollar=''
      table.append([c, region, i['InstanceId'], iname, i['State']['Name'],
                   i['InstanceType'], str(i['LaunchTime']), instance_cost_dollar])
      c += 1
      found = True
      
  if found:
    fdetails = [ f for f in functions[service] if f['name'] == inspect.stack()[0][3]]
    body += html_header_2(fdetails[0]['title'])
    body += html_header_3('Total EC2 spending approx <font color="red">$' + str(savings['ec2']) + '</font>. Save cost by identifying and stopping unused EC2 instances (if any)')
    body += html_table(header, table)
  return body

def get_ebs_volumes(service, pricing, regions, body):
  header = ['Sr.','Region', 'Volume Id', 'Name', 'Type', 'State', 'IOPS', 'Size', 'Creation Time', 'Monthly Cost']
  table = []
  c=1
  found = False
  for region in regions:
    ec2client = boto3.client('ec2', region_name=region)
    data = ec2client.describe_volumes()
    if not data or not data['Volumes']:
      continue
    for v in data['Volumes']:
      if v['State'] != 'available':
        continue
      vname = ''
      if 'Tags' in v:
        tag = [x for x in v['Tags'] if x['Key'] == 'Name']
        vname = tag[0]['Value'] if tag[0] else ''
      try:
        volume_cost = get_ebs_price(pricing, region, v['VolumeType'], v['Size'])
        saving_update('ebs', volume_cost)
        volume_cost_dollar = '$ ' + str(volume_cost) if volume_cost else ''
      except Exception,e:
        print "Could not determine cost. %s" %(str(e))
        volume_cost_dollar = ''
      table.append([c, region, v['VolumeId'], vname, v['VolumeType'], v['State'], v['Iops'], v['Size'], v['CreateTime'], volume_cost_dollar])
      c += 1
      found = True
  if found:
    fdetails = [ f for f in functions[service] if f['name'] == inspect.stack()[0][3]]
    body += html_header_2(fdetails[0]['title'])
    body += html_header_3('*save monthly <font color="red">$' + str(savings['ebs']) + '</font> by deleting following EBS volumes')
    body += html_table(header, table)
  return body

def get_eip(service, pricing, regions, body):
  header = ['Sr.','Region', 'EIP', 'InstanceId', 'InstanceState', 'Monthly Cost']
  table = []
  c=1
  found = False
  for region in regions:
    ec2client = boto3.client('ec2', region_name=region)
    data = ec2client.describe_addresses()
    if not data or not data['Addresses']:
      continue
    for e in data['Addresses']:
      iid=''
      istate=''
      if 'AssociationId' in e:
        if 'InstanceId' in e:
          idata = ec2client.describe_instances(InstanceIds=[e['InstanceId']],
                                                Filters=[{'Name':'instance-state-name', 'Values': ['stopped']}]
                                                )
          if not idata['Reservations']:
            continue
          else:
            iid =  idata['Reservations'][0]['Instances'][0]['InstanceId']
            istate = idata['Reservations'][0]['Instances'][0]['State']['Name']
        else:#If EIP is not attached to instance e.g NAT
          continue
      try:
        eip_cost = get_eip_price(pricing, region, 'eip')
        saving_update('eip', eip_cost)
        eip_cost_dollar = '$ ' + str(eip_cost) if eip_cost else ''
      except Exception,e:
        print "Could not determine cost. %s" %(str(e))
        eip_cost_dollar = ''
      table.append([c, region, e['PublicIp'], iid, istate, eip_cost_dollar])
      c += 1
      found = True
  if found:
    fdetails = [ f for f in functions[service] if f['name'] == inspect.stack()[0][3]]
    body += html_header_2(fdetails[0]['title'])
    body += html_header_3('*save monthly <font color="red">$' + str(savings['eip']) + '</font> by releasing following EIP addresses')
    body += html_table(header, table)
  return body

####### START ###########

parser = argparse.ArgumentParser()
parser.add_argument('--service', '-s', help='service', required=False)
parser.add_argument('--region', '-r', help='Region', required=False)
args = parser.parse_args()

RegionMapping = {
				'us-east-1': 'US East (N. Virginia)',
				'us-east-2': 'US East (Ohio)',
				'us-west-1': 'US West (N. California)',
				'us-west-2': 'US West (Oregon)',
				'ap-south-1': 'Asia Pacific (Mumbai)',
				'ap-northeast-2': 'Asia Pacific (Seoul)',
				'ap-southeast-1': 'Asia Pacific (Singapore)',
        'ap-southeast-2': 'Asia Pacific (Sydney)',
				'ap-northeast-1': 'Asia Pacific (Tokyo)',
				'ca-central-1': 'Canada (Central)',
				'eu-central-1': 'EU (Frankfurt)',
				'eu-west-1': 'EU (Ireland)',
        'eu-west-2': 'EU (London)',
				'eu-west-3': 'EU (Paris)',
				'sa-east-1': 'South America (Sao Paulo)',
			}
OfferTermCode = 'JRTCKXETXF'
price_cache = {}
savings = {}

def lambda_handler(event, context):
  #all_regions = [ {'RegionName' : 'ap-south-1'},{'RegionName' : 'us-east-1'}]
  pricing = boto3.client('pricing', region_name='us-east-1') #Always N.Virginia
  regions = []
  body = "<!DOCTYPE html><html><body>"
  all_regions = get_supported_regions('ec2')
  clear_cache()
  for service in functions.keys():
    if args.service and args.service != service:
      continue
    if args.region:
      regions = [args.region]
    else:
      for region in all_regions:
        regions.append(region['RegionName'])
    for f in functions[service]:
      body = globals()[f['name']](service, pricing, regions, body)
  
  if not savings:
    body += html_para('Looks like there are no AWS resources (EC2, EBS, EIP) in your AWS account')
  else:
    body += html_para('Note: The EC2 monthly cost is calculated considering on-demand pricing for Linux instances in given AWS Region.')
  body += "</html></body>"
  send_email(body)
  return 0

def send_email(body):
  #print body
  ses = boto3.client('ses', region_name='us-east-1')
  #Check if Sender and Receiver emails are already registered
  vemails = ses.list_verified_email_addresses(IdentityType='EmailAddress')
  reqemails = ses.list_identities(IdentityType='EmailAddress')
  sender = os.environ['SENDER_EMAIL']
  sender_verified = False
  if sender not in vemails['VerifiedEmailAddresses']:
    if sender in reqemails['Identities']:
      print "Sender Email %s verification is pending .." %(sender)
    else:
      print "Sending the verification for sender %s.." %(sender)
      ses.verify_email_address(EmailAddress=sender)
  else:
    sender_verified = True

  emailids = os.environ['RECV_EMAIL'].strip().replace("\t","").replace(" ","").split(",")
  verified = []
  for email in emailids:
    if email in vemails['VerifiedEmailAddresses']:
      verified.append(email)
    else:
      if email in reqemails['Identities'] and email not in vemails['VerifiedEmailAddresses']:
        print "%s is requested to verify the email" %(email)
      else:
        print "Sending the verification for receiver %s" %(email)
        ses.verify_email_address(EmailAddress=email)
  try:
    #Send email to only verified email addresses
    if verified and sender_verified:
      print "Sending emails to: ", verified
      ses.send_raw_email(
        Source=sender,
        Destinations=verified,
        RawMessage= {'Data': 'Subject: AWS Usage Daily Report\nMIME-Version: 1.0 \nContent-Type: text/html;\n\n' + body }
      )
    clear_cache()
  except Exception as e:
    print "Error sending email: %s" %(str(e))

def get_supported_regions(service):
  response = []
  if service == 'ec2':
    ec2_c = boto3.client('ec2')
    response = ec2_c.describe_regions()
  return response['Regions'] if response['Regions'] else []

#lambda_handler(None, None)

@Eyjafjallajokull
Copy link
Author

Updated script to include referenced resource names, also added command to delete snapshots.

@ramuvvy
Copy link

ramuvvy commented Jun 30, 2019 via email

@Eyjafjallajokull
Copy link
Author

Sorry, I cannot help you with that.

@ramuvvy
Copy link

ramuvvy commented Jul 1, 2019 via email

@smartkriash
Copy link

I am running the script for fetching the snapshot report before running delete with below command in windows

python snapshots.py snapshot-report

where will report get saved

If i run the same script in linux with below command

python /home/ec2-user/snapshots.py snapshot-report

It is generating a empty csv file.

Please help to generate report of orphaned snapshots

@MarkZagorski
Copy link

Did you modify the script to change the regions variable to reflect your used regions?
If you dont have anything in those two defined regions.. its not going to return anything.. because you dont have anything...

regions = ['us-west-1', 'eu-central-1']

Works out of the box for me...

@deeco
Copy link

deeco commented Nov 29, 2019

how does it handle if description of snapshot has different value other than regex = r"^Created by CreateImage((.?)) for (.?) "

@harishnehru
Copy link

Hi Eyjafjallajokull

I am not able to get the snapshot-cleanup working. Only the Report is working, none of the other function is working.

I am getting the below error, could you please help. I am running on Python 3.8.2

$ python snapshots.py snapshot-cleanup
b'snapshot id volume id ami id instance id size start time description '
region=eu-west-1
Traceback (most recent call last):
File "snapshots.py", line 217, in
cli()
File "C:\Users\alan\AppData\Roaming\Python\Python38\site-packages\click\core.py", line 828, in call
return self.main(*args, **kwargs)
File "C:\Users\alan\AppData\Roaming\Python\Python38\site-packages\click\core.py", line 781, in main
rv = self.invoke(ctx)
File "C:\Users\alan\AppData\Roaming\Python\Python38\site-packages\click\core.py", line 1227, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "C:\Users\alan\AppData\Roaming\Python\Python38\site-packages\click\core.py", line 1046, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "C:\Users\alan\AppData\Roaming\Python\Python38\site-packages\click\core.py", line 590, in invoke
return callback(*args, **kwargs)
File "snapshots.py", line 77, in snapshot_cleanup
print('{:22} {:22} {:22} {:22} {:>7} {:25} {:30}'.format(
File "C:\Program Files\Python38\lib\encodings\cp1252.py", line 19, in encode
return codecs.charmap_encode(input,self.errors,encoding_table)[0]
UnicodeEncodeError: 'charmap' codec can't encode character '\u274c' in position 35: character maps to

Looking forward for your advise.

Thank you

@harishnehru
Copy link

Hi Eyjafjallajokull

I am not able to get the snapshot-cleanup working. Only the Report is working, none of the other function is working.

I am getting the below error, could you please help. I am running on Python 3.8.2

$ python snapshots.py snapshot-cleanup
b'snapshot id volume id ami id instance id size start time description '
region=eu-west-1
Traceback (most recent call last):
File "snapshots.py", line 217, in
cli()
File "C:\Users\alan\AppData\Roaming\Python\Python38\site-packages\click\core.py", line 828, in call
return self.main(*args, **kwargs)
File "C:\Users\alan\AppData\Roaming\Python\Python38\site-packages\click\core.py", line 781, in main
rv = self.invoke(ctx)
File "C:\Users\alan\AppData\Roaming\Python\Python38\site-packages\click\core.py", line 1227, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "C:\Users\alan\AppData\Roaming\Python\Python38\site-packages\click\core.py", line 1046, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "C:\Users\alan\AppData\Roaming\Python\Python38\site-packages\click\core.py", line 590, in invoke
return callback(*args, **kwargs)
File "snapshots.py", line 77, in snapshot_cleanup
print('{:22} {:22} {:22} {:22} {:>7} {:25} {:30}'.format(
File "C:\Program Files\Python38\lib\encodings\cp1252.py", line 19, in encode
return codecs.charmap_encode(input,self.errors,encoding_table)[0]
UnicodeEncodeError: 'charmap' codec can't encode character '\u274c' in position 35: character maps to

Looking forward for your advise.

Thank you

I was able to fix this by encoding with utf-8 on Windows 10. Use utf-8 in the code as shown below.

print('{:22} {:22} {:22} {:22} {:>7} {:25} {:30}'.format(
snapshot['id'],
snapshot['volume_id'] + volume_exists,
snapshot['ami_id'] + ami_exists,
snapshot['instance_id'] + instance_exists,
str(snapshot['size']) + 'gb',
str(snapshot['start_time']),
snapshot['description']
).encode('utf8'))

@x86nick
Copy link

x86nick commented May 11, 2020

Hello Eyjafjallajokull

Another helpful feature might be shared AMI vs Non Shared AMI.
Deleting Non Shared AMI will save cost. Or AMI thats is older then x amount of time will save cost as well.

@vickypiet
Copy link

I ran the python script to list snapshots via AWS Lambda. However, getting below error. Did anyone face this issue yet?
Unable to import module 'lambda_function': No module named 'click'

@dinhhuy258
Copy link

I ran the python script to list snapshots via AWS Lambda. However, getting below error. Did anyone face this issue yet?
Unable to import module 'lambda_function': No module named 'click'

Run command pip3 install click to install click module then everything will be fine

@Praba-N
Copy link

Praba-N commented Sep 21, 2021

Hi,

I'm getting following error while run script from Linux. Could someone help to fix?

File "snap.py", line 15
SyntaxError: Non-ASCII character '\xe2' in file snap.py on line 15, but no encoding declared; see http://python.org/dev/peps/pep-0263/ for details

Thanks!

@apio-sys
Copy link

There seems to be an issue with the regex in parse_description(description) since it is not returning the ami-id out of the "Created by CreateImage(i-01010101010101010) for ami-01010101010101010" string in the snapshot description. Hence the script will propose to delete a snapshot which has an AMI still registered thus rendering the remaining image useless without it's corresponding snapshot...

@bgth
Copy link

bgth commented May 7, 2022

Please check the regex.
regex = r"^Created by CreateImage\((.*?)\) for (.*)"

@TheDauntless
Copy link

A fixed parse_description which handles more 'created by' situations

def parse_description(description):
    if "from" in description:
        regex = r"^Created by CreateImage\((.*?)\) for (.*)(?: from .*)"
    else:
        regex = r"^Created by CreateImage\((.*?)\) for (ami.*)"
    matches = re.finditer(regex, description, re.MULTILINE)
    for matchNum, match in enumerate(matches):
        return match.groups()
    return '', ''

@ocordovez
Copy link

One question - How can I run this script for several accounts in an org ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment