Skip to content

Instantly share code, notes, and snippets.

@kubosuke
Last active January 6, 2024 14:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kubosuke/c8bbc83367d1084ae26e07bc2487e0d5 to your computer and use it in GitHub Desktop.
Save kubosuke/c8bbc83367d1084ae26e07bc2487e0d5 to your computer and use it in GitHub Desktop.
Delete stale metrics from pushgateway
from datetime import datetime, timedelta
import os
import requests
from prometheus_client.parser import text_string_to_metric_families
PUSHGATEWAY_METRICS_URL = "https://xxx.com/metrics"
PUSH_TIME_METRICS_NAME = "push_time_seconds"
def is_metrics_stale(timestamp: float, retention_period_days: int = 7) -> bool:
"""Return True if the metrics is stale.
Args:
timestamp (float): epoch timestamp in prom format. ex: 1701841866.00605802
Returns:
bool: True if the stale than RETENTION_PERIOD_DAYS(days).
"""
return datetime.fromtimestamp(timestamp) < datetime.now() - timedelta(days=retention_period_days)
if __name__ == "__main__":
with requests.session() as session:
metrics_list_response = session.get(PUSHGATEWAY_METRICS_URL)
metrics_list_response.raise_for_status()
for family in text_string_to_metric_families(metrics_list_response.text):
for sample in family.samples:
if sample.name.startswith(PUSH_TIME_METRICS_NAME):
if is_metrics_stale(sample.value):
url = f"{PUSHGATEWAY_METRICS_URL}/job/{sample.labels['job']}/instance/{sample.labels['instance']}"
res = session.delete(url)
res.raise_for_status()
print(f"DELETE {url}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment