Skip to content

Instantly share code, notes, and snippets.

@TobleMiner
Created April 25, 2019 02:33
Show Gist options
  • Save TobleMiner/7335b4a5b7d132afbc5f02b553fe0e1e to your computer and use it in GitHub Desktop.
Save TobleMiner/7335b4a5b7d132afbc5f02b553fe0e1e to your computer and use it in GitHub Desktop.
Raspberry PI based PMS5003 dust monitor for munin
#!/usr/bin/env python3
import sys
import socket
import colorsys
import re
from struct import unpack, calcsize
from collections import namedtuple
SOCKET = '/tmp/dust.sock'
FORMAT = '>9d'
type = re.search('(?P<type>[a-z]+)$', sys.argv[0]).group('type')
Measurement = namedtuple('Measurement', 'pm0_3 pm0_5 pm1_0 pm2_5 pm5_0 pm10 pm1_0u_atm pm2_5u_atm pm10u_atm')
class Measurement():
FIELDS = { 'abs': ('particles / Liter', ('pm0_3', 'pm0_5', 'pm1_0', 'pm2_5', 'pm5_0', 'pm10')),
'conc': ('ug / m3', ('pm1_0u_atm', 'pm2_5u_atm', 'pm10u_atm')) }
def _make(values):
measure = Measurement()
labels = []
for _,v in Measurement.FIELDS.items():
labels += v[1]
for i in range(len(values)):
measure.values[labels[i]] = values[i]
return measure
def __init__(self):
self.values = {}
def vals(self, key):
return {k: v for k, v in self.values.items() if k in self.FIELDS[key][1] }
def fetch():
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(SOCKET)
length = calcsize(FORMAT)
data = sock.recv(length)
measure = Measurement._make(unpack(FORMAT, data))
vals = measure.vals(type)
for k,v in vals.items():
print(f"{k}.value {v}")
def hexcolor(h):
r, g, b = map(lambda c: int(255 * c), colorsys.hsv_to_rgb(h, 1, 1))
return f"{r:02x}{g:02x}{b:02x}"
def configure():
print(f"""graph_title Airborne dust
graph_args --base 1000 --lower-limit 0
graph_vlabel {Measurement.FIELDS[type][0]}
graph_info Dust
graph_category environment""")
fields = Measurement.FIELDS[type][1]
for i in range(len(fields)):
print(f"{fields[i]}.label {fields[i]}")
print(f"{fields[i]}.draw LINE1")
print(f"{fields[i]}.colour {hexcolor(i / len(fields))}")
if len(sys.argv) >= 2 and sys.argv[1] == "config":
configure()
else:
fetch()
#!/usr/bin/env python3
import serial
import sys
import decimal
import socket
import os
import _thread
from struct import unpack, pack
from collections import namedtuple
DEBUG = False
SOCKET = '/tmp/dust.sock'
START_PATTERN = b'BM'
Packet = namedtuple('Packet', 'len pm1_0u_cf pm2_5u_cf pm10u_cf pm1_0u_atm pm2_5u_atm pm10u_atm pm0_3 pm0_5 pm1_0 pm2_5 pm5_0 pm10 res csum')
class Measurement():
FIELDS = ('pm1_0u_cf', 'pm2_5u_cf', 'pm10u_cf', 'pm1_0u_atm', 'pm2_5u_atm', 'pm10u_atm', 'pm0_3', 'pm0_5', 'pm1_0', 'pm2_5', 'pm5_0', 'pm10')
def _make(values):
measure = Measurement()
for i in range(len(values)):
measure.measures[measure.FIELDS[i]] = values[i]
return measure
def __init__(self):
self.measures = {}
for field in self.FIELDS:
self.measures[field] = 0
def __iadd__(self, b):
for field in self.FIELDS:
self.measures[field] += b.measures[field]
return self
def __itruediv__(self, div):
for field in self.FIELDS:
self.measures[field] /= div
return self
def abs(self, pm):
pm = str(pm).replace('.', '_')
return self.measures[f'pm{pm}'] * 10
def conc(self, pm):
pm = str(pm).replace('.', '_')
return self.measures[f'pm{pm}u_atm']
def __repr__(self):
return f"""Dust measurement:
Count PM0.3: {self.measures['pm0_3']} 1/100 ml
Count PM0.5: {self.measures['pm0_5']} 1/100 ml
Count PM1.0: {self.measures['pm1_0']} 1/100 ml
Count PM2.5: {self.measures['pm2_5']} 1/100 ml
Count PM5.0: {self.measures['pm5_0']} 1/100 ml
Count PM10.: {self.measures['pm10']} 1/100 ml
Concentration PM1.0 {self.measures['pm1_0u_atm']} ug/m3
Concentration PM2.5 {self.measures['pm2_5u_atm']} ug/m3
Concentration PM10. {self.measures['pm10u_atm']} ug/m3"""
class MeasurementWindow():
def __init__(self, size=10):
self.size = size
self.measurements = []
def addMeasurement(self, measure):
self.measurements.append(measure)
if(len(self.measurements) > self.size):
self.measurements = self.measurements[len(self.measurements) - self.size:]
def getAverage(self):
avg = Measurement()
for measure in self.measurements:
avg += measure
if len(self.measurements) > 0:
avg /= len(self.measurements)
return avg
port = sys.argv[1]
window = MeasurementWindow()
try:
os.unlink(SOCKET)
except OSError:
if os.path.exists(SOCKET):
raise
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(SOCKET)
sock.listen(1)
avg = None
def sock_thread():
try:
while True:
conn = sock.accept()[0]
if avg:
conn.sendall(pack('>9d', avg.abs(0.3), avg.abs(0.5), avg.abs(1.0), avg.abs(2.5), avg.abs(5.0), avg.abs(10), avg.conc(1.0), avg.conc(2.5), avg.conc(10)))
conn.close()
except:
sys.exit(1)
_thread.start_new_thread(sock_thread, ())
with serial.Serial(port, 9600) as serial:
while True:
serial.timeout = None
# Detect start of packet
pattern = b''
while pattern != START_PATTERN:
pattern += serial.read(1)
if len(pattern) > len(START_PATTERN):
pattern = pattern[len(pattern) - len(START_PATTERN):]
# Receive payload
serial.timeout = 5
data = serial.read(30)
if not data or len(data) != 30:
print("Failed to receive data")
continue
packet = Packet._make(unpack('>15H', data))
csum = 0
for c in list(START_PATTERN) + list(data[:-2]):
csum += c
if csum != packet.csum:
print("Checksum invalid")
continue
measure = Measurement._make(packet[1:-2])
window.addMeasurement(measure)
avg = window.getAverage()
if DEBUG:
print(packet)
print(measure)
print(avg)
[Unit]
Description=Dust sensor service
After=syslog.target network.target
[Service]
User=dust
ExecStart=/usr/local/bin/dust_server.py /dev/ttyAMA0
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment