Last active
June 27, 2024 15:02
-
-
Save baztian/4cb5cf0b9b654cfcc8d5fd8f628ebe88 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import boto3 | |
import urllib.parse | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import os | |
import sys | |
import time | |
import argparse | |
# Initialize boto3 client | |
sqs = boto3.client('sqs') | |
def check_queue(queue_url): | |
# Get the approximate number of messages in the queue | |
response = sqs.get_queue_attributes( | |
QueueUrl=queue_url, | |
AttributeNames=['ApproximateNumberOfMessages'] | |
) | |
message_count = int(response['Attributes']['ApproximateNumberOfMessages']) | |
if message_count > 1: | |
return (message_count, queue_url) | |
# ANSI escape codes for styles | |
BOLD = "\033[1m" | |
GREEN = "\033[92m" | |
RESET = "\033[0m" | |
def display_results(results): | |
session = boto3.session.Session() | |
current_region = session.region_name | |
console_link = f"https://console.aws.amazon.com/sqs/v2/home?region={current_region}" | |
displayable_results = [] | |
for message_count, queue_url in results: | |
base_name = queue_url.split('/')[-1] | |
encoded_url = urllib.parse.quote(queue_url, safe='') | |
console_queue_link = f"{console_link}#/queues/{encoded_url}" | |
displayable_results.append((base_name, message_count, console_queue_link)) | |
# Sort results by message count in descending order | |
sorted_display_results = sorted(displayable_results, key=lambda x: f"{x[1]}:{base_name}", reverse=True) | |
print("\r" + " " * 60, end='\r') # Clear the line | |
max_base_name_length = max(len(result[0]) for result in sorted_display_results) | |
max_message_count_length = len(str(max(result[1] for result in sorted_display_results))) | |
for base_name, message_count, console_link in sorted_display_results: | |
print(f"""{BOLD}{base_name.ljust(max_base_name_length)}{RESET}: {GREEN}{str(message_count).rjust(max_message_count_length)} msgs{RESET} | |
{console_link}""") | |
def get_queue_infos(queue_urls): | |
total_queues = len(queue_urls) | |
# Use ThreadPoolExecutor to check queues in parallel | |
with ThreadPoolExecutor() as executor: | |
futures = [executor.submit(check_queue, url) for url in queue_urls] | |
results = [] | |
processed_queues = 0 | |
for future in as_completed(futures): | |
processed_queues += 1 | |
print(f"\rProcessed {BOLD}{processed_queues}{RESET} out of {BOLD}{total_queues}{RESET} queues...", end='', flush=True) | |
result = future.result() | |
if result: # Ensure the queue had more than 1 message | |
results.append(result) | |
return results | |
def main(): | |
parser = argparse.ArgumentParser(description='List SQS queues that have at least one message and update periodically.') | |
parser.add_argument('-s', '--schedule', nargs="?", const=60, type=int, | |
help="Update every [n] seconds. Default is 60 seconds if no value is provided.") | |
args = parser.parse_args() | |
try: | |
print(f"Reading list of queues...", end='', flush=True) | |
# List all queues | |
response = sqs.list_queues() | |
queue_urls = response.get('QueueUrls', []) | |
if args.schedule is None: | |
results = get_queue_infos(queue_urls) | |
display_results(results) | |
return | |
while True: | |
results = get_queue_infos(queue_urls) | |
os.system('clear' if os.name == 'posix' else 'cls') # Clear the console | |
display_results(results) | |
time.sleep(args.schedule) | |
except KeyboardInterrupt: | |
print(f"{BOLD}{GREEN}Program terminated by user.{RESET}") | |
sys.exit(0) # Exit the program gracefully | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment