Skip to content

Instantly share code, notes, and snippets.

@aelindeman
Last active April 20, 2022 19:14
Show Gist options
  • Save aelindeman/c1ee3ce9b138e2a5a65c78811fff560d to your computer and use it in GitHub Desktop.
Save aelindeman/c1ee3ce9b138e2a5a65c78811fff560d to your computer and use it in GitHub Desktop.
InfluxDB adapter script for NUT (Network UPS Tools) output
#!/usr/bin/env python3
import subprocess
import sys
import time
MEASUREMENT_NAME = "nut"
UPS_STATUS = {
"OL": "Online",
"OB": "On Battery",
"LB": "Low Battery",
"HB": "High Battery",
"RB": "Replace Battery",
"CHRG": "Battery Charging",
"DISCHRG": "Battery Discharging",
"BYPASS": "Bypass Active",
"CAL": "Runtime Calibration",
"OFF": "Offline",
"OVER": "Overloaded",
"TRIM": "Trimming Voltage",
"BOOST": "Boosting Voltage",
"FSD": "Forced Shutdown",
}
def main():
upses = subprocess.run(["upsc", "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
for ups in upses.stdout.splitlines():
try:
data = subprocess.run(["upsc", ups], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
tags = {"name": f"\"{ups}\"" if " " in ups else ups}
fields = {}
for line in data.stdout.splitlines():
key, value = line.strip().split(": ", 2)
# try to coerce type
try:
if key == "ups.status":
value = f"\"{UPS_STATUS[value]}\""
else:
value = float(value)
except ValueError:
value = f"\"{value}\""
fields.update({key: value})
tagset = ",".join([MEASUREMENT_NAME, *[f"{key}={value}" for key, value in tags.items()]])
fieldset = ",".join([f"{key}={value}" for key, value in fields.items()])
print(tagset, fieldset, time.time_ns(), file=sys.stdout)
except (subprocess.SubprocessError, TypeError, ValueError) as err:
print("error:", repr(err), file=sys.stderr)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment