promethesus metrics endpoint for plantower aqi sensors, depends only on pyserial
#!/usr/bin/env python3 | |
import serial, struct | |
from http.server import BaseHTTPRequestHandler, HTTPServer | |
packet_format = struct.Struct('>xxxxHHHHHHHHHHHHxxH') | |
cmd_format = struct.Struct('>ccBBBH') | |
prom_template = """# TYPE success gauge | |
success 1 | |
# TYPE pm gauge | |
pm{{size="1.0"}} {} | |
pm{{size="2.5"}} {} | |
pm{{size="10.0"}} {} | |
pm{{size="1.0",atmospheric="true"}} {} | |
pm{{size="2.5",atmospheric="true"}} {} | |
pm{{size="10.0",atmospheric="true"}} {} | |
# TYPE n gauge | |
n{{size="0.3"}} {} | |
n{{size="0.5"}} {} | |
n{{size="1.0"}} {} | |
n{{size="2.5"}} {} | |
n{{size="5.0"}} {} | |
n{{size="10.0"}} {}""" | |
def cmd(cmd, arg1, arg2, resp_size): | |
ser.write(cmd_format.pack(b'B', b'M', cmd, arg1, arg2, 0x42 + 0x4d + cmd + arg1 + arg2)) | |
return ser.read(resp_size) | |
def get_data(): | |
raw = cmd(0xe2, 0, 0, 32) # poll sensor | |
check = sum(raw[0:30]) | |
data = packet_format.unpack(raw) | |
if raw[0:4] != b'BM\x00\x1c' or data[-1] != check: | |
# TODO reset sensor | |
return None | |
# strip the footer (header is ignored in packet_format) | |
return data[:-1] | |
class Handler(BaseHTTPRequestHandler): | |
def do_GET(self): | |
data = get_data() | |
self.send_response(200) | |
self.send_header('Content-Type', 'text/plain') | |
self.end_headers() | |
if data is None: | |
self.wfile.write(b"# TYPE success gauge\nsuccess 0") | |
else: | |
self.wfile.write(prom_template.format(*data).encode()) | |
if __name__ == "__main__": | |
ser = serial.Serial('/dev/ttyS0', 9600, timeout=1) | |
cmd(0xe1, 0, 0, 8) # set passive mode | |
HTTPServer(("0.0.0.0", 5000), Handler).serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment