Skip to content

Instantly share code, notes, and snippets.

@bidhanahdib
Last active December 11, 2025 19:12
Show Gist options
  • Select an option

  • Save bidhanahdib/2768ad2d1870b8514e18fb6ab0697fc2 to your computer and use it in GitHub Desktop.

Select an option

Save bidhanahdib/2768ad2d1870b8514e18fb6ab0697fc2 to your computer and use it in GitHub Desktop.
rabbitmq_queue_metrics.py
import os
import requests
import json
from datetime import datetime, timezone
from dotenv import load_dotenv
import logging
# RabbitMQ hosts
RABBIT_HOSTS = [
"http://srv01.abc.com:15672",
"http://srv02.abc.com:15672",
"http://srv03.abc.com:15672",
"http://srv04:15672",
"http://srv05.com:15672"
]
# Elasticsearch hosts
ES_NODES = [
"http://es01.abc.com:9200",
"http://es02.abc.com:9200"
]
load_dotenv()
RABBIT_USER = os.getenv("RABBIT_USER")
RABBIT_PASS = os.getenv("RABBIT_PASS")
ES_API_KEY = os.getenv("ES_API_KEY")
DATA_STREAM = "logs-rabbitmq-prod"
# Logging setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Headers for Elasticsearch REST API
ES_HEADERS = {
"Content-Type": "application/json",
"Authorization": f"ApiKey {ES_API_KEY}"
}
def fetch_queues(host):
url = f"{host}/api/queues"
try:
logging.info(f"Fetching queues from {host}")
response = requests.get(url, auth=(RABBIT_USER, RABBIT_PASS), timeout=10)
response.raise_for_status()
queues = response.json()
logging.info(f"Successfully fetched {len(queues)} queues from {host}")
return queues
except requests.RequestException as e:
logging.error(f"Failed to fetch queues from {host}: {e}")
return []
def fetch_queue_details(host, vhost, queue_name):
if vhost == "/":
vhost = "%2F"
url = f"{host}/api/queues/{vhost}/{queue_name}"
try:
response = requests.get(url, auth=(RABBIT_USER, RABBIT_PASS), timeout=10)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logging.error(f"Failed to fetch queue detail {queue_name} from {host}: {e}")
return None
def clean_queue_data(data):
data["@timestamp"] = datetime.now(timezone.utc).isoformat()
if "node" in data and isinstance(data["node"], str):
data["node_name"] = data["node"].split("@")[1] if "@" in data["node"] else data["node"]
if "name" in data:
data["queue_name"] = data["name"]
if "slave_nodes" in data and isinstance(data["slave_nodes"], list):
data["slave_nodes_name"] = [node.split("@")[1] if "@" in node else node for node in data["slave_nodes"]]
consumer_details = data.get("consumer_details", [])
consumer_ips = []
if isinstance(consumer_details, list):
for consumer in consumer_details:
channel_details = consumer.get("channel_details", {})
peer_host = channel_details.get("peer_host")
if peer_host:
consumer_ips.append(peer_host)
if consumer_ips:
data["consumer_ip"] = consumer_ips
if "backing_queue_status" in data and "delta" in data["backing_queue_status"]:
data["backing_queue_status"]["delta"] = str(data["backing_queue_status"]["delta"])
return data
def index_to_es(document):
success = False
for es_host in ES_NODES:
url = f"{es_host}/{DATA_STREAM}/_doc"
try:
response = requests.post(url, headers=ES_HEADERS, data=json.dumps(document), timeout=10)
if response.status_code in (200, 201):
success = True
break
else:
logging.warning(f"Failed to index to {es_host}: {response.status_code} {response.text}")
except requests.RequestException as e:
logging.warning(f"Error connecting to {es_host}: {e}")
if not success:
logging.error("Failed to index document to all ES hosts")
def metrics_rabbitmq():
for host in RABBIT_HOSTS:
queues = fetch_queues(host)
for q in queues:
vhost = q.get("vhost")
qname = q.get("name")
details = fetch_queue_details(host, vhost, qname)
if details:
details = clean_queue_data(details)
index_to_es(details)
if __name__ == "__main__":
metrics_rabbitmq()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment