Skip to content

Instantly share code, notes, and snippets.

@baztian
Last active June 27, 2024 15:02
Show Gist options
  • Save baztian/4cb5cf0b9b654cfcc8d5fd8f628ebe88 to your computer and use it in GitHub Desktop.
Save baztian/4cb5cf0b9b654cfcc8d5fd8f628ebe88 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import boto3
import urllib.parse
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
import sys
import time
import argparse
# Initialize boto3 client
sqs = boto3.client('sqs')
def check_queue(queue_url):
# Get the approximate number of messages in the queue
response = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['ApproximateNumberOfMessages']
)
message_count = int(response['Attributes']['ApproximateNumberOfMessages'])
if message_count > 1:
return (message_count, queue_url)
# ANSI escape codes for styles
BOLD = "\033[1m"
GREEN = "\033[92m"
RESET = "\033[0m"
def display_results(results):
session = boto3.session.Session()
current_region = session.region_name
console_link = f"https://console.aws.amazon.com/sqs/v2/home?region={current_region}"
displayable_results = []
for message_count, queue_url in results:
base_name = queue_url.split('/')[-1]
encoded_url = urllib.parse.quote(queue_url, safe='')
console_queue_link = f"{console_link}#/queues/{encoded_url}"
displayable_results.append((base_name, message_count, console_queue_link))
# Sort results by message count in descending order
sorted_display_results = sorted(displayable_results, key=lambda x: f"{x[1]}:{base_name}", reverse=True)
print("\r" + " " * 60, end='\r') # Clear the line
max_base_name_length = max(len(result[0]) for result in sorted_display_results)
max_message_count_length = len(str(max(result[1] for result in sorted_display_results)))
for base_name, message_count, console_link in sorted_display_results:
print(f"""{BOLD}{base_name.ljust(max_base_name_length)}{RESET}: {GREEN}{str(message_count).rjust(max_message_count_length)} msgs{RESET}
{console_link}""")
def get_queue_infos(queue_urls):
total_queues = len(queue_urls)
# Use ThreadPoolExecutor to check queues in parallel
with ThreadPoolExecutor() as executor:
futures = [executor.submit(check_queue, url) for url in queue_urls]
results = []
processed_queues = 0
for future in as_completed(futures):
processed_queues += 1
print(f"\rProcessed {BOLD}{processed_queues}{RESET} out of {BOLD}{total_queues}{RESET} queues...", end='', flush=True)
result = future.result()
if result: # Ensure the queue had more than 1 message
results.append(result)
return results
def main():
parser = argparse.ArgumentParser(description='List SQS queues that have at least one message and update periodically.')
parser.add_argument('-s', '--schedule', nargs="?", const=60, type=int,
help="Update every [n] seconds. Default is 60 seconds if no value is provided.")
args = parser.parse_args()
try:
print(f"Reading list of queues...", end='', flush=True)
# List all queues
response = sqs.list_queues()
queue_urls = response.get('QueueUrls', [])
if args.schedule is None:
results = get_queue_infos(queue_urls)
display_results(results)
return
while True:
results = get_queue_infos(queue_urls)
os.system('clear' if os.name == 'posix' else 'cls') # Clear the console
display_results(results)
time.sleep(args.schedule)
except KeyboardInterrupt:
print(f"{BOLD}{GREEN}Program terminated by user.{RESET}")
sys.exit(0) # Exit the program gracefully
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment