Clean out old SQS queues
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This script iterates through all your SQS queues (based on your default | |
AWS credentials), shows you a few helpful attributes, and asks if you want | |
to delete it. It's meant as an easy way to clean out old queues that are | |
no longer needed. | |
Requires boto3 (pip install boto3). | |
The names of queues are derived from their URLs which are assumed to have | |
a common prefix: replace the 12345 in the URL below. If you don't know what | |
value it is just run the script and an error message will show you. | |
""" | |
import boto3 | |
from datetime import datetime | |
def strip_prefix(string, prefix): | |
if string.startswith(prefix): | |
return string[len(prefix):] | |
raise ValueError('%s does not start with %s' % (string, prefix)) | |
def queue_name(queue): | |
# Fill in the 12345 | |
return strip_prefix(queue.url, 'https://queue.amazonaws.com/12345/') | |
def select_items(dictionary, keys): | |
return ((key, dictionary[key]) for key in keys) | |
def important_queue_attributes(queue): | |
return select_items(queue.attributes, | |
['ApproximateNumberOfMessages', | |
'ApproximateNumberOfMessagesDelayed', | |
'ApproximateNumberOfMessagesNotVisible', | |
'CreatedTimestamp', | |
'LastModifiedTimestamp', | |
]) | |
def timestamp_to_date(timestamp): | |
if isinstance(timestamp, basestring): | |
timestamp = float(timestamp) | |
return datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d') | |
def ask_with_default_no(question): | |
return raw_input('%s? (y/[N])' % question).lower().startswith('y') | |
def main(): | |
sqs = boto3.resource('sqs') | |
queues = sqs.queues.all() | |
for queue in queues: | |
print 'Name:', queue_name(queue) | |
attributes = important_queue_attributes(queue) | |
for attribute_name, attribute_value in attributes: | |
if 'Timestamp' in attribute_name: | |
attribute_value = timestamp_to_date(attribute_value) | |
print '%s: %s' % (attribute_name, attribute_value) | |
if ask_with_default_no('Delete'): | |
queue.delete() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment