Skip to content

Instantly share code, notes, and snippets.

@ondrejmo
Created February 6, 2023 21:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ondrejmo/beeb4276b0b72b3ff1b867a2e35529a3 to your computer and use it in GitHub Desktop.
Save ondrejmo/beeb4276b0b72b3ff1b867a2e35529a3 to your computer and use it in GitHub Desktop.
Simple library I used for querying Prometheus API and storing the results in Polars dataframes
import json
import dateparser
import polars as pl
from datetime import datetime
from requests import get
from typing import Tuple
# https://prometheus.io/docs/prometheus/latest/querying/api/
def ptime(input: str) -> datetime:
return dateparser.parse(input).astimezone().isoformat()
class Prometheus:
"""
Class for querying Prometheus
"""
def __init__(self, url: str = "http://localhost:9090", auth: Tuple = ("", "")) -> None:
self.url = url
self.auth = auth
def labels(self, start: str = "", end: str = "") -> list:
"""
List all labels
"""
params = {} if start == "" or end == "" else {"start": start, "end": end}
res = get(url=self.url + "/api/v1/labels", auth=self.auth, params=params)
if res.status_code != 200 or res.json()["status"] != "success":
raise Exception("Well, fuck...")
return res.json()["data"]
def values(self, label: str, start: str = "now - 1d", end: str = "now") -> list:
"""
List all values of given label
"""
params = {} if start == "" or end == "" else {"start": start, "end": end}
res = get(url=self.url + "/api/v1/label/" + label + "/values", auth=self.auth, params=params)
if res.status_code != 200 or res.json()["status"] != "success":
raise Exception("Well, fuck...")
return res.json()["data"]
def query(self, expression: str = "{job=~\".+\"}", time: str = "now") -> Tuple[str, list]:
"""
An instant query expression
"""
res = get(url=self.url + "/api/v1/query", auth=self.auth, params={
"query": expression,
"time": time,
})
if res.status_code != 200 or res.json()["status"] != "success":
# print(res.text)
raise Exception("Well, fuck...")
return res.json()["data"]["resultType"], res.json()["data"]["result"]
def queryRange(self, expression: str = "{job=~\".+\"}", start: str = "now - 1d", end: str = "now") -> Tuple[str, list]:
"""
A range query expression
"""
res = get(url=self.url + "/api/v1/query_range", auth=self.auth, params={
"query": expression,
"start": start,
"end": end,
"step": int((dateparser.parse(end) - dateparser.parse(start)).total_seconds()) // 10000 # this is hardcoded limit
})
if res.status_code != 200 or res.json()["status"] != "success":
print(res.text)
raise Exception("Well, fuck...")
return res.json()["data"]["resultType"], res.json()["data"]["result"]
def queryName(p: Prometheus, name: str) -> Tuple[str, list]:
"""
Instant, return raw metrics from type "vector", based on provided __name__ label
"""
res = p.query(expression="{__name__=\"%s\"}" % name)
if res[0] != "vector":
raise Exception("Well, fuck...")
return res[1]
def queryRangeName(p: Prometheus, name: str, start: str, end: str) -> list:
"""
Range, return raw metrics from type "matrix", based on provided __name__ label
"""
res = p.queryRange(expression="{__name__=\"%s\"}" % name, start=start, end=end)
if res[0] != "matrix":
raise Exception("Well, fuck...")
return res[1]
def metrics2df(metrics: list = [], labels: list = []) -> pl.DataFrame:
"""
Takes output of queryName() or queryRangeName() and list of labels, returns DataFrame with labels as columns
"""
data = dict(map(lambda x: (x, []), labels + ["time", "value"]))
for mtr in metrics:
for val in mtr["values"]:
data["time"].append(datetime.fromtimestamp(val[0]))
data["value"].append(int(val[1]))
for lbl in labels:
if lbl in mtr["metric"].keys():
data[lbl].append(mtr["metric"][lbl])
else:
data[lbl].append("")
return pl.DataFrame(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment