Skip to content

Instantly share code, notes, and snippets.

@jbaiter
Last active March 3, 2022 13:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jbaiter/003cc925904a9dce7b2d05973bb28c99 to your computer and use it in GitHub Desktop.
Save jbaiter/003cc925904a9dce7b2d05973bb28c99 to your computer and use it in GitHub Desktop.
Rewriting proxy to reduce cardinality of Prometheus node-exporter counter metrics.
#!/usr/bin/env python3.9
""" Rewriting proxy to reduce cardinality of Prometheus node-exporter counter metrics.
This small proxy service is intended to run alongside the Prometheus
node-exporter, with Prometheus fetching the metrics from the proxy instead of
from the exporter directly. The proxy will then apply user-defined rewriting
rules to the metrics' labels, currently either by wholesale removal of certain
labels or by rewriting certain label values. If, after rewriting, there are
several time series with identical names and labels, they are summed to form a
single counter metric.
As an example, consider the notoriously high-cardinality (and by default
disabled) `mountstats` collector, which includes a `export` and `protocol` label
for each mounted NFS share. If we were only interested in aggregated metrics by
the address of the remote NFS server and mount all of our shares with the same
protocol, we could simply remove those two labels to significantly cut down on
the cardinality of the exported metrics, before they even reach Prometheus.
Here's a sample rewriting config that does just this:
```json
{
# Remove `export` and `protocol` labels from all NFS mountstats metrics,
# they're too fine-grained for us
"metric_regex": "node_mountstats_nfs_.+",
"action": "remove",
"label_regex": "(export|protocol)"
}
```
Similarly, we could decide that we do want to keep certain labels, but are only
interested in a subset of its possible values and only interested in an
aggregated value for the rest of the values. This could again be the case for
the aforementioned `mountstats` exporter, which has a whole bunch of metrics
that expose an `operation` label alongside the usual labels. Let's say we're
only interested in detailed metrics for the `READ`, `WRITE`, `ACCESS` and
`CREATE` operations and want to group the rest of the operations under an
aggregated `OTHER` operation, here's what a config for this could look like
(using a negative lookahead (`(?!...)`) in the regular expression):
```json
{
# Reduce granularity of NFS Operations metrics to track only the most
# important operations separately
"metric_regex": "node_mountstats_nfs_operations_.+",
"action": "replace",
"label_regex": "operation",
"value_regex": "^(?!(READ|WRITE|ACCESS|CREATE)$).+",
"replacement": "OTHER",
}
```
The two above examples are included as the default configuration, which will be
used in absence of a user-defined configuration (passed as the path to a JSON
file via the `--config` command line argument).
"""
# MIT License
#
# Copyright (c) 2022 Johannes Baiter <johannes.baiter@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import annotations
from dataclasses import dataclass
import argparse
import hashlib
import itertools
import json
import re
import urllib.request
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from typing import Iterator, List, Literal, Mapping, Optional
from urllib.parse import urlparse
VERSION = "0.1"
METRIC_PAT = re.compile(
r"^(?P<name>[a-zA-Z_:][a-zA-Z0-9_]*)(?:{(?P<labels>.*?)})? (?P<value>.+)$"
)
DEFAULT_METRICS_CONFIG = [
{
# Remove `export` and `protocol` labels from all NFS mountstats metrics, they're too fine-grained for us
"metric_regex": "node_mountstats_nfs_.+",
"action": "remove",
"label_regex": "(export|protocol)",
},
{
# Reduce granularity of NFS Operations metrics to track only the most important operations separately
"metric_regex": "node_mountstats_nfs_operations_.+",
"action": "replace",
"label_regex": "operation",
"value_regex": "^(?!(READ|WRITE|ACCESS|CREATE)$).+",
"replacement": "OTHER",
},
]
@dataclass
class RewriteConfig:
"""Configuration for a single rewrite rule."""
metric_regex: re.Pattern
action: Literal["remove", "replace"]
label_regex: re.Pattern
value_regex: Optional[re.Pattern]
replacement: Optional[str]
@dataclass
class PrometheusMetric:
"""A parsed Prometheus metric."""
name: str
value: float
labels: Mapping[str, str]
help: Optional[str]
type: Optional[str]
@classmethod
def parse(
cls, metric_line: str, help: Optional[str] = None, type: Optional[str] = None
) -> Optional[PrometheusMetric]:
"""Parse a PrometheusMetric from a line in the Prometheus text format."""
match = METRIC_PAT.match(metric_line)
if not match:
return None
name = match.group("name")
if match.group("labels") is None:
labels = {}
else:
labels = {
k: v.strip('"')
for k, v in [l.split("=") for l in match.group("labels").split(",")]
}
try:
value = float(match.group("value"))
return PrometheusMetric(name, value, labels, help, type)
except ValueError:
return None
def serialize(self, with_comments=True) -> str:
"""Serialize a PrometheusMetric to a line in the Prometheus text format."""
out: List[str] = []
if with_comments:
if self.help:
out.append(f"# HELP {self.name} {self.help}")
if self.type:
out.append(f"# TYPE {self.name} {self.type}")
val = int(self.value) if self.value.is_integer() else self.value
if len(self.labels) > 0:
label_str = ",".join(f'{k}="{v}"' for k, v in self.labels.items())
out.append(f"{self.name}{{{label_str}}} {val}")
else:
out.append(f"{self.name} {val}")
return "\n".join(out)
def matches(self, other: PrometheusMetric) -> bool:
"""Check if this metric matches another metric, i.e. whether the metric name and the labels are identical"""
return self.name == other.name and self.labels == other.labels
@property
def series_id(self) -> str:
"""A identifier that uniquely identifies a time series (hash of name, labels and their values)."""
h = hashlib.sha256()
h.update(self.name.encode("utf-8"))
for k, v in sorted(self.labels.items()):
h.update(k.encode("utf-8"))
h.update(v.encode("utf-8"))
return h.hexdigest()
def fetch_metrics(url: str) -> Iterator[PrometheusMetric]:
"""Fetch unfiltered metrics from the upstream exporter."""
with urllib.request.urlopen(url) as resp:
current_metric: Optional[str] = None
current_help: Optional[str] = None
current_type: Optional[str] = None
for line in resp:
line = line.strip().decode("utf8")
if line.startswith("# HELP"):
current_metric, current_help = line.split(" ", 3)[2:]
elif line.startswith("# TYPE"):
current_metric, current_type = line.split(" ", 3)[2:]
elif line:
metric = PrometheusMetric.parse(line, current_help, current_type)
if metric.name != current_metric:
metric.help = None
metric.type = None
current_metric = None
current_help = None
current_type = None
if metric:
yield metric
def rewrite_labels(
metrics: List[PrometheusMetric], rewrite_cfgs: List[RewriteConfig]
) -> List[PrometheusMetric]:
"""Rewrite Prometheus labels in counter metrics according to the configuration."""
metrics_by_name = {
name: list(grp)
for name, grp in itertools.groupby(metrics, key=lambda m: m.name)
}
for cfg in rewrite_cfgs:
for metric_name in metrics_by_name:
if not cfg.metric_regex.match(metric_name):
continue
metric_type = next(
(m.type for m in metrics_by_name[metric_name] if m.type is not None),
None,
)
# We only support rewriting/removing labels for counters, since only these can be safely aggregated
if metric_type != "counter" or (
metric_type is None and not metric_name.endswith("_total")
):
continue
if cfg.action == "remove":
for metric in metrics_by_name[metric_name]:
metric.labels = {
label: value
for label, value in metric.labels.items()
if not cfg.label_regex.match(label)
}
elif cfg.action == "replace":
for metric in metrics_by_name[metric_name]:
metric.labels = {
k: cfg.value_regex.sub(cfg.replacement, v)
for k, v in metric.labels.items()
}
out_metrics: List[PrometheusMetric] = []
metrics_by_identity = itertools.groupby(metrics, key=lambda m: m.series_id)
for _, grouped_metrics in metrics_by_identity:
grouped_metrics = list(grouped_metrics)
if len(grouped_metrics) == 1:
out_metrics.append(grouped_metrics[0])
else:
out_metrics.append(
PrometheusMetric(
grouped_metrics[0].name,
sum(m.value for m in grouped_metrics),
grouped_metrics[0].labels,
grouped_metrics[0].help,
grouped_metrics[0].type,
)
)
return out_metrics
def run_proxy(
nodeexporter_url: str, host: str, port: int, cfg: List[RewriteConfig], quiet: bool
) -> None:
metrics_path = urlparse(nodeexporter_url).path
class RewritingHandler(BaseHTTPRequestHandler):
server_version = f"prometheus-exporter-rewrite-proxy/{VERSION}"
def do_GET(self):
if self.path != metrics_path:
self.send_error(404)
return
upstream_metrics = list(fetch_metrics(nodeexporter_url))
rewritten = rewrite_labels(upstream_metrics, cfg)
self.send_response(200)
self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
self.end_headers()
prev_name = None
for metric in rewritten:
self.wfile.write(
metric.serialize(with_comments=prev_name != metric.name).encode(
"utf-8"
)
)
self.wfile.write(b"\n")
prev_name = metric.name
def log_request(self, code: int | str = ..., size: int | str = ...) -> None:
if not quiet:
super().log_request(code, size)
httpd = ThreadingHTTPServer((host, port), RewritingHandler)
httpd.serve_forever()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--node-exporter-url",
required=True,
help="URL of the node_exporter",
)
parser.add_argument(
"--host", default="0.0.0.0", help="Host to bind to (default='0.0.0.0')"
)
parser.add_argument(
"--port", default=9100, type=int, help="Port to bind to (default=9100"
)
parser.add_argument(
"--config",
type=str,
required=False,
help="Path to JSON config file with an array of custom rewriting rules, will replace defaults",
)
parser.add_argument(
"--quiet",
action="store_true",
help="Don't log requests to stderr",
)
args = parser.parse_args()
if args.config:
with open(args.config) as fp:
cfg_dicts = json.load(fp)
else:
cfg_dicts = DEFAULT_METRICS_CONFIG
for cfg in cfg_dicts:
# TODO: Validate config?
cfg["metric_regex"] = re.compile(cfg["metric_regex"])
cfg["label_regex"] = re.compile(cfg["label_regex"])
if cfg["action"] == "replace":
cfg["value_regex"] = re.compile(cfg["value_regex"])
else:
cfg["replacement"] = None
cfg["value_regex"] = None
run_proxy(
args.node_exporter_url,
args.host,
args.port,
[RewriteConfig(**cfg) for cfg in cfg_dicts],
args.quiet,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment