Skip to content

Instantly share code, notes, and snippets.

@ajeddeloh

ajeddeloh/aqi.py

Last active Jan 23, 2021
Embed
What would you like to do?
promethesus metrics endpoint for plantower aqi sensors, depends only on pyserial
#!/usr/bin/env python3
import serial, struct
from http.server import BaseHTTPRequestHandler, HTTPServer
packet_format = struct.Struct('>xxxxHHHHHHHHHHHHxxH')
cmd_format = struct.Struct('>ccBBBH')
prom_template = """# TYPE success gauge
success 1
# TYPE pm gauge
pm{{size="1.0"}} {}
pm{{size="2.5"}} {}
pm{{size="10.0"}} {}
pm{{size="1.0",atmospheric="true"}} {}
pm{{size="2.5",atmospheric="true"}} {}
pm{{size="10.0",atmospheric="true"}} {}
# TYPE n gauge
n{{size="0.3"}} {}
n{{size="0.5"}} {}
n{{size="1.0"}} {}
n{{size="2.5"}} {}
n{{size="5.0"}} {}
n{{size="10.0"}} {}"""
def cmd(cmd, arg1, arg2, resp_size):
ser.write(cmd_format.pack(b'B', b'M', cmd, arg1, arg2, 0x42 + 0x4d + cmd + arg1 + arg2))
return ser.read(resp_size)
def get_data():
raw = cmd(0xe2, 0, 0, 32) # poll sensor
check = sum(raw[0:30])
data = packet_format.unpack(raw)
if raw[0:4] != b'BM\x00\x1c' or data[-1] != check:
# TODO reset sensor
return None
# strip the footer (header is ignored in packet_format)
return data[:-1]
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
data = get_data()
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
if data is None:
self.wfile.write(b"# TYPE success gauge\nsuccess 0")
else:
self.wfile.write(prom_template.format(*data).encode())
if __name__ == "__main__":
ser = serial.Serial('/dev/ttyS0', 9600, timeout=1)
cmd(0xe1, 0, 0, 8) # set passive mode
HTTPServer(("0.0.0.0", 5000), Handler).serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment