Skip to content

Instantly share code, notes, and snippets.

@alexmojaki
Created November 23, 2015 13:13
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save alexmojaki/faca5bf3b1712ec9b692 to your computer and use it in GitHub Desktop.
Save alexmojaki/faca5bf3b1712ec9b692 to your computer and use it in GitHub Desktop.
Clean out old SQS queues
"""
This script iterates through all your SQS queues (based on your default
AWS credentials), shows you a few helpful attributes, and asks if you want
to delete it. It's meant as an easy way to clean out old queues that are
no longer needed.
Requires boto3 (pip install boto3).
The names of queues are derived from their URLs which are assumed to have
a common prefix: replace the 12345 in the URL below. If you don't know what
value it is just run the script and an error message will show you.
"""
import boto3
from datetime import datetime
def strip_prefix(string, prefix):
if string.startswith(prefix):
return string[len(prefix):]
raise ValueError('%s does not start with %s' % (string, prefix))
def queue_name(queue):
# Fill in the 12345
return strip_prefix(queue.url, 'https://queue.amazonaws.com/12345/')
def select_items(dictionary, keys):
return ((key, dictionary[key]) for key in keys)
def important_queue_attributes(queue):
return select_items(queue.attributes,
['ApproximateNumberOfMessages',
'ApproximateNumberOfMessagesDelayed',
'ApproximateNumberOfMessagesNotVisible',
'CreatedTimestamp',
'LastModifiedTimestamp',
])
def timestamp_to_date(timestamp):
if isinstance(timestamp, basestring):
timestamp = float(timestamp)
return datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d')
def ask_with_default_no(question):
return raw_input('%s? (y/[N])' % question).lower().startswith('y')
def main():
sqs = boto3.resource('sqs')
queues = sqs.queues.all()
for queue in queues:
print
print 'Name:', queue_name(queue)
attributes = important_queue_attributes(queue)
for attribute_name, attribute_value in attributes:
if 'Timestamp' in attribute_name:
attribute_value = timestamp_to_date(attribute_value)
print '%s: %s' % (attribute_name, attribute_value)
if ask_with_default_no('Delete'):
queue.delete()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment