Skip to content

Instantly share code, notes, and snippets.

@jgeboski
Created December 10, 2022 23:27
Show Gist options
  • Save jgeboski/56eb625bc9072c5f11dde164cc7b9cb8 to your computer and use it in GitHub Desktop.
Save jgeboski/56eb625bc9072c5f11dde164cc7b9cb8 to your computer and use it in GitHub Desktop.
Collector for Prometheus for interface addresses with timing data. Useful with a Cron job in pfSense to export WAN addresses via the node exporter.
#!/usr/bin/env python3.8
import argparse
import logging
import os
import re
import subprocess
import time
from datetime import datetime
from ipaddress import ip_address, IPv4Address, IPv6Address
from typing import Any, Dict, NamedTuple, Optional, Set, Union
IfAddrs = Dict[str, Set[Union[IPv4Address, IPv6Address]]]
IfAddrTimes = Dict[str, Dict[Union[IPv4Address, IPv6Address], int]]
METRIC_NAME = "node_network_address_assigned_seconds"
PROMETHEUS_METRIC_PATTERN = re.compile(
rf"^(?P<name>\w+)\s*" r"(?:{(?P<labels>.+)})?\s*" r"(?P<value>\d+(?:\.\d+)?)$"
)
logger = logging.getLogger(__name__)
class PrometheusMetric(NamedTuple):
name: str
value: float
labels: Dict[str, str]
def __str__(self) -> str:
labels_str = ",".join(
f'{label}="{value}"'
for label, value in sorted(self.labels.items(), key=lambda lv: lv[0])
)
return f"{self.name}{{{labels_str}}} {self.value}"
@staticmethod
def from_str(line: str) -> Optional["PrometheusMetric"]:
line = line.strip()
if line.startswith("#"):
return None
metric_match = PROMETHEUS_METRIC_PATTERN.match(line)
if not metric_match:
print("1")
return None
labels: Dict[str, str] = {}
labels_str = metric_match.group("labels")
for label_str in labels_str.split(","):
label_parts = label_str.split("=", 1)
if len(label_parts) != 2:
print("2")
return None
label = label_parts[0].strip()
value_str = label_parts[1].strip()
if not value_str.startswith('"') or not value_str.startswith('"'):
return None
labels[label] = value_str[1:-1]
return PrometheusMetric(
name=metric_match.group("name"),
value=float(metric_match.group("value")),
labels=labels,
)
def get_if_addrs() -> IfAddrs:
if_addrs: IfAddrs = {}
proc = subprocess.run("ifconfig", stdout=subprocess.PIPE, check=True)
if_name = None
for line in proc.stdout.decode().splitlines():
stripped_line = line.strip()
if not stripped_line:
continue
words = stripped_line.split()
assert len(words) > 0, f"Invalid line: {line}"
row_type = words[0]
if row_type.endswith(":"):
row_type = row_type[:-1]
if row_type in ("inet", "inet6"):
assert len(words) > 1, f"Invalid inet line: {line}"
assert if_name, f"No if name for line: {line}"
# FreeBSD appends %{if_name} to IPv6 addresses
addr_str = words[1].split("%", 1)[0]
if_addrs[if_name].add(ip_address(addr_str))
elif not line[0].isspace():
if_name = row_type
if_addrs[if_name] = set()
return dict(if_addrs)
def filter_public_if_addrs(if_addrs: IfAddrs) -> IfAddrs:
public_if_addrs: IfAddrs = {}
for if_name, addrs in if_addrs.items():
filtered_addrs = {addr for addr in addrs if addr.is_global}
if filtered_addrs:
public_if_addrs[if_name] = filtered_addrs
return public_if_addrs
def update_if_addr_times(if_addr_times: IfAddrTimes, if_addrs: IfAddrs) -> IfAddrTimes:
updated_if_addr_times: IfAddrTimes = {}
now = int(time.time())
for if_name, addrs in if_addrs.items():
addr_times = if_addr_times.get(if_name, {})
for addr in addrs:
if if_name not in updated_if_addr_times:
updated_if_addr_times[if_name] = {}
updated_if_addr_times[if_name][addr] = addr_times.get(addr, now)
return updated_if_addr_times
def load_if_addr_times(prom_file: str) -> IfAddrTimes:
if not os.path.exists(prom_file):
return {}
if_addr_times: IfAddrTimes = {}
with open(prom_file) as fp:
for line in fp:
metric = PrometheusMetric.from_str(line)
if (
not metric
or metric.name != METRIC_NAME
or set(metric.labels) - {"addr", "if_name", "addr_type"}
):
logger.warning("Ignoring bad metric in %s: %s", prom_file, line)
continue
if_name = metric.labels["if_name"]
try:
addr = ip_address(metric.labels["addr"])
acquire_time = int(metric.value)
except ValueError:
logger.warning("Ignoring bad metric in %s: %s", prom_file, line)
continue
if if_name not in if_addr_times:
if_addr_times[if_name] = {}
if_addr_times[if_name][addr] = acquire_time
return if_addr_times
def write_if_addr_times(if_addr_times: IfAddrTimes, prom_file: str) -> None:
os.makedirs(os.path.dirname(prom_file), exist_ok=True)
with open(prom_file, "w") as fp:
for if_name, addr_times in if_addr_times.items():
for addr, acquire_time in addr_times.items():
metric = PrometheusMetric(
name=METRIC_NAME,
value=float(acquire_time),
labels={
"addr": f"{addr}",
"if_name": if_name,
"addr_type": f"IPv{addr.version}",
},
)
print(f"{metric}", file=fp)
def parse_args() -> Any:
parser = argparse.ArgumentParser(
description="Export interface addresses with timing for Prometheus",
)
parser.add_argument(
"--prom-file",
"-f",
action="store",
help="File path for the PROM timing data.",
required=True,
)
parser.add_argument(
"--only-public",
"-p",
action="store_true",
help="Only export public addresses.",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Show verbose logging messages.",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
logging.basicConfig(
format="%(levelname)s: %(message)s",
level=logging.DEBUG if args.verbose else logging.INFO,
)
if_addrs = get_if_addrs()
if args.only_public:
if_addrs = filter_public_if_addrs(if_addrs)
if args.verbose:
for if_name, addrs in if_addrs.items():
for addr in addrs:
logger.debug("Found address %s on %s", addr, if_name)
logger.info("Found %s addresses on the host", len(if_addrs))
if_addr_times = load_if_addr_times(args.prom_file)
if args.verbose:
for if_name, addr_times in if_addr_times.items():
for addr, acquire_time in addr_times.items():
logger.debug(
"Loaded previous address %s on %s from %s",
addr,
if_name,
datetime.utcfromtimestamp(acquire_time).strftime("%c"),
)
logger.info("Loaded %s previous addresses", len(if_addr_times))
updated_if_addr_times = update_if_addr_times(if_addr_times, if_addrs)
if updated_if_addr_times != if_addr_times:
old_addrs = {
(if_name, f"{addr}")
for if_name, addr_times in if_addr_times.items()
for addr in addr_times
}
new_addrs = {
(if_name, f"{addr}")
for if_name, addr_times in updated_if_addr_times.items()
for addr in addr_times
}
added_addrs = sorted(new_addrs - old_addrs)
for if_name, str_addr in added_addrs:
logger.info("Added address %s on %s", if_name, str_addr)
removed_addrs = sorted(old_addrs - new_addrs)
for if_name, str_addr in removed_addrs:
logger.info("Removed address %s on %s", if_name, str_addr)
logger.info(
"Added %s new addresses and removed %s old addresses",
len(added_addrs),
len(removed_addrs),
)
write_if_addr_times(updated_if_addr_times, args.prom_file)
if __name__ == "__main__":
main()
@jgeboski
Copy link
Author

pfSense Cron job:

* 	* 	* 	* 	* 	nobody 	/usr/bin/nice -n20 /usr/local/bin/prometheus-address-collector -pf /var/tmp/node_exporter/network-address.prom

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment