Skip to content

Instantly share code, notes, and snippets.

@zrzka
Created October 10, 2017 18:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zrzka/53631c4833b7bbc2c7635346c1184767 to your computer and use it in GitHub Desktop.
Save zrzka/53631c4833b7bbc2c7635346c1184767 to your computer and use it in GitHub Desktop.
Quick hack to remove all lamba function versions except X latest ones
#!/usr/bin/env python3
import os
import boto3
from boto3.session import Session
import argparse
session = Session()
client = session.client('lambda')
def gen_functions():
marker = None
while True:
args = {
'MaxItems': 20
}
if marker:
args['Marker'] = marker
r = client.list_functions(**args)
for fn in r.get('Functions', []):
yield fn
marker = r.get('NextMarker', None)
if not marker:
break
def get_function_aliases(name):
aliases = []
marker = None
while True:
args = {
'FunctionName': name
}
if marker:
args['Marker'] = marker
r = client.list_aliases(**args)
aliases.extend(r.get('Aliases', []))
marker = r.get('NextMarker', None)
if not marker:
break
return aliases
def get_function_versions(name):
versions = []
marker = None
while True:
args = {
'FunctionName': name
}
if marker:
args['Marker'] = marker
r = client.list_versions_by_function(**args)
versions.extend(r.get('Versions', []))
marker = r.get('NextMarker', None)
if not marker:
break
return versions
def remove_versions(keep_latest_count):
for fn in gen_functions():
name = fn['FunctionName']
print('Function {}'.format(name))
versions = [
v['Version']
for v in get_function_versions(name)
]
if len(versions) <= keep_latest_count:
print(' - found {} versions, skipping'.format(len(versions)))
continue
versions = sorted(versions)
if not versions[0] == '$LATEST':
print(' - no $LATEST as first version, skipping')
continue
aliases = get_function_aliases(name)
# Drop '$LATEST'
versions_to_delete = versions[1:]
# Drop last (keep_latest_count - 1) versions
versions_to_delete = versions_to_delete[:-keep_latest_count+1]
aliases_to_delete = [
a
for a in aliases
if a['FunctionVersion'] in versions_to_delete
]
if aliases_to_delete:
print(' - deleting {} aliases'.format(len(aliases_to_delete)))
for alias in aliases_to_delete:
r = client.delete_alias(
FunctionName=name, Name=alias['Name']
)
if not r.get('ResponseMetadata', {}).get('HTTPStatusCode', 0) == 204:
print(' - failed to delete alias {}'.format(alias['Name']))
print(' - deleting {} versions'.format(len(versions_to_delete)))
for version in versions_to_delete:
r = client.delete_function(
FunctionName=name, Qualifier=version
)
if not r.get('ResponseMetadata', {}).get('HTTPStatusCode', 0) == 204:
print(' - failed to delete version {}'.format(version))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-k", "--keep-latest-count", dest="keep_latest_count", metavar="N", type=int)
parser.set_defaults(keep_latest_count=5)
args = parser.parse_args()
remove_versions(args.keep_latest_count)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment