Skip to content

Instantly share code, notes, and snippets.

@h3po
Created October 8, 2017 13:18
Show Gist options
  • Save h3po/d6768c2787254291b358cb4a6f221154 to your computer and use it in GitHub Desktop.
Save h3po/d6768c2787254291b358cb4a6f221154 to your computer and use it in GitHub Desktop.
Simple prometheus exporter for temperature and power sensor data from a Corsar HX750i (and possibly other HXi and RMi series) power supply. Based on OpenCorsairLink by audiohacked
#!/usr/bin/python3
"""
2017-10 by mail@h3po.de
using protocol reverse engineered by audiohacked: https://github.com/audiohacked/OpenCorsairLink
"""
import usb1, struct, threading
from http.server import HTTPServer
from client_python.prometheus_client.core import Metric, REGISTRY
from client_python.prometheus_client.exposition import MetricsHandler
#HX750i
VID = 0x1b1c
PID = 0x1c05
class UpdatingGauge(Metric):
def __init__(self, name, documentation, labelnames):
super().__init__(name, documentation, "gauge")
self._samples = {}
self.samples = []
self._labelnames = labelnames
def new_sample(self, labelvalues, value):
self._samples[labelvalues] = value
def update(self):
self.samples = ((self.name, dict(zip(self._labelnames, labelvalues)), value) for labelvalues, value in self._samples.items())
class CorsairPsuCollector(object):
def __init__(self, vid, pid, ep=1, metric_prefix="corsairpsu"):
#usb init
self.ep = ep
self.context = usb1.USBContext()
self.handle = self.context.openByVendorIDAndProductID(vid, pid, skip_on_error=True)
if self.handle is None: raise Exception("Could not aquire device handle")
self.packetsize = self.handle.getDevice().getMaxPacketSize(self.ep)
#metrics init
self.metric_prefix = metric_prefix
self.common_labels = ("name", "vendor", "product")
self.common_label_values = (
self._getValue(b"\xfe\x03", self._readString),
self._getValue(b"\x03\x99", self._readString),
self._getValue(b"\x03\x9a", self._readString))
self.gauges = {
"temperature": UpdatingGauge("%s_temperature_celsius" % self.metric_prefix,
"temperature in degrees celsius at sensor $sensor",
self.common_labels + ("sensor",)),
"voltage": UpdatingGauge("%s_voltage_volt" % self.metric_prefix,
"voltage in volts at rail $rail",
self.common_labels + ("rail",)),
"current": UpdatingGauge("%s_current_ampere" % self.metric_prefix,
"current in amperes at rail $rail",
self.common_labels + ("rail",)),
"power": UpdatingGauge("%s_power_watt" % self.metric_prefix,
"power in watts at rail $rail",
self.common_labels + ("rail",))}
@staticmethod
def _bytesToFloat(b):
assert len(b) == 2
tmp = struct.unpack("H", b)[0]
exponent = tmp >> 11
fraction = tmp & 2047
if exponent > 15: exponent = -(32-exponent)
if fraction > 1023: fraction = -(2048-fraction)
if fraction & 1: fraction += 1
return float(fraction * pow(2, exponent))
def _read(self):
return self.handle.interruptRead(self.ep, self.packetsize, timeout=3)
def _write(self, data):
assert len(data) <= self.packetsize
return self.handle.interruptWrite(self.ep, data)
def _readString(self):
return self._read()[2:].decode("ascii").split("\x00")[0]
def _readFloat(self):
return self._bytesToFloat(self._read()[2:4])
def _selectRail(self, rail):
self._write(b"\x02\x00" + bytes((rail,)))
def _getValue(self, command, readfunction):
self._write(command)
return readfunction()
def collect(self):
self.gauges["temperature"].new_sample(self.common_label_values + ("0",), self._getValue(b"\x03\x8d", self._readFloat))
self.gauges["temperature"].new_sample(self.common_label_values + ("1",), self._getValue(b"\x03\x8e", self._readFloat))
self.gauges["voltage"].new_sample(self.common_label_values + ("input",), self._getValue(b"\x03\x88", self._readFloat))
for railindex, railname in enumerate(("12V", "5V", "3V3")):
self._selectRail(railindex)
self.gauges["voltage"].new_sample(self.common_label_values + (railname,), self._getValue(b"\x03\x8b", self._readFloat))
self.gauges["current"].new_sample(self.common_label_values + (railname,), self._getValue(b"\x03\x8c", self._readFloat))
self.gauges["power"].new_sample(self.common_label_values + (railname,), self._getValue(b"\x03\x96", self._readFloat))
for gauge in self.gauges.values():
gauge.update()
yield gauge
def start_http_server(port, addr=''):
"""Starts a HTTP server for prometheus metrics as a daemon thread."""
class PrometheusMetricsServer(threading.Thread):
def run(self):
httpd = HTTPServer((addr, port), MetricsHandler)
httpd.serve_forever()
t = PrometheusMetricsServer()
t.start()
return t
if __name__ == "__main__":
REGISTRY.register(CorsairPsuCollector(VID, PID))
start_http_server(9666).join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment