-
-
Save bidhanahdib/2768ad2d1870b8514e18fb6ab0697fc2 to your computer and use it in GitHub Desktop.
rabbitmq_queue_metrics.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import requests | |
| import json | |
| from datetime import datetime, timezone | |
| from dotenv import load_dotenv | |
| import logging | |
| # RabbitMQ hosts | |
| RABBIT_HOSTS = [ | |
| "http://srv01.abc.com:15672", | |
| "http://srv02.abc.com:15672", | |
| "http://srv03.abc.com:15672", | |
| "http://srv04:15672", | |
| "http://srv05.com:15672" | |
| ] | |
| # Elasticsearch hosts | |
| ES_NODES = [ | |
| "http://es01.abc.com:9200", | |
| "http://es02.abc.com:9200" | |
| ] | |
| load_dotenv() | |
| RABBIT_USER = os.getenv("RABBIT_USER") | |
| RABBIT_PASS = os.getenv("RABBIT_PASS") | |
| ES_API_KEY = os.getenv("ES_API_KEY") | |
| DATA_STREAM = "logs-rabbitmq-prod" | |
| # Logging setup | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s [%(levelname)s] %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| logging.getLogger("urllib3").setLevel(logging.WARNING) | |
| # Headers for Elasticsearch REST API | |
| ES_HEADERS = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"ApiKey {ES_API_KEY}" | |
| } | |
| def fetch_queues(host): | |
| url = f"{host}/api/queues" | |
| try: | |
| logging.info(f"Fetching queues from {host}") | |
| response = requests.get(url, auth=(RABBIT_USER, RABBIT_PASS), timeout=10) | |
| response.raise_for_status() | |
| queues = response.json() | |
| logging.info(f"Successfully fetched {len(queues)} queues from {host}") | |
| return queues | |
| except requests.RequestException as e: | |
| logging.error(f"Failed to fetch queues from {host}: {e}") | |
| return [] | |
| def fetch_queue_details(host, vhost, queue_name): | |
| if vhost == "/": | |
| vhost = "%2F" | |
| url = f"{host}/api/queues/{vhost}/{queue_name}" | |
| try: | |
| response = requests.get(url, auth=(RABBIT_USER, RABBIT_PASS), timeout=10) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.RequestException as e: | |
| logging.error(f"Failed to fetch queue detail {queue_name} from {host}: {e}") | |
| return None | |
| def clean_queue_data(data): | |
| data["@timestamp"] = datetime.now(timezone.utc).isoformat() | |
| if "node" in data and isinstance(data["node"], str): | |
| data["node_name"] = data["node"].split("@")[1] if "@" in data["node"] else data["node"] | |
| if "name" in data: | |
| data["queue_name"] = data["name"] | |
| if "slave_nodes" in data and isinstance(data["slave_nodes"], list): | |
| data["slave_nodes_name"] = [node.split("@")[1] if "@" in node else node for node in data["slave_nodes"]] | |
| consumer_details = data.get("consumer_details", []) | |
| consumer_ips = [] | |
| if isinstance(consumer_details, list): | |
| for consumer in consumer_details: | |
| channel_details = consumer.get("channel_details", {}) | |
| peer_host = channel_details.get("peer_host") | |
| if peer_host: | |
| consumer_ips.append(peer_host) | |
| if consumer_ips: | |
| data["consumer_ip"] = consumer_ips | |
| if "backing_queue_status" in data and "delta" in data["backing_queue_status"]: | |
| data["backing_queue_status"]["delta"] = str(data["backing_queue_status"]["delta"]) | |
| return data | |
| def index_to_es(document): | |
| success = False | |
| for es_host in ES_NODES: | |
| url = f"{es_host}/{DATA_STREAM}/_doc" | |
| try: | |
| response = requests.post(url, headers=ES_HEADERS, data=json.dumps(document), timeout=10) | |
| if response.status_code in (200, 201): | |
| success = True | |
| break | |
| else: | |
| logging.warning(f"Failed to index to {es_host}: {response.status_code} {response.text}") | |
| except requests.RequestException as e: | |
| logging.warning(f"Error connecting to {es_host}: {e}") | |
| if not success: | |
| logging.error("Failed to index document to all ES hosts") | |
| def metrics_rabbitmq(): | |
| for host in RABBIT_HOSTS: | |
| queues = fetch_queues(host) | |
| for q in queues: | |
| vhost = q.get("vhost") | |
| qname = q.get("name") | |
| details = fetch_queue_details(host, vhost, qname) | |
| if details: | |
| details = clean_queue_data(details) | |
| index_to_es(details) | |
| if __name__ == "__main__": | |
| metrics_rabbitmq() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment