Skip to content

Instantly share code, notes, and snippets.

@deepcoder
Created April 13, 2022 16:11
Show Gist options
  • Save deepcoder/73fd88b4079d58dd5f60a041836dad65 to your computer and use it in GitHub Desktop.
Save deepcoder/73fd88b4079d58dd5f60a041836dad65 to your computer and use it in GitHub Desktop.
purple aqi sensors to home assistant MQTT sensors
# home assistant sensors
# Purple AQI monitors
- platform: mqtt
name: 'Purple AQI SP01'
state_topic: "purple-aqi/SP01"
unit_of_measurement: "AQI"
value_template: "{{ value_json.aqi }}"
json_attributes_topic: "purple-aqi/SP01"
- platform: mqtt
name: 'Purple AQI SP02'
state_topic: "purple-aqi/SP02"
unit_of_measurement: "AQI"
value_template: "{{ value_json.aqi }}"
json_attributes_topic: "purple-aqi/SP02"
- platform: mqtt
name: 'Purple AQI SP03'
state_topic: "purple-aqi/SP03"
unit_of_measurement: "AQI"
value_template: "{{ value_json.aqi }}"
json_attributes_topic: "purple-aqi/SP03"
# purple averages using:
# https://github.com/Limych/ha-average
- platform: average
name: 'Purple AQI SP Average Current'
precision: 1
unique_id: __legacy__
entities:
- sensor.purple_aqi_sp01
- sensor.purple_aqi_sp02
- sensor.purple_aqi_sp03
- platform: average
name: 'Purple AQI SP Average 1 Hr'
precision: 1
unique_id: __legacy__
duration:
hours: 1
entities:
- sensor.purple_aqi_sp01
- sensor.purple_aqi_sp02
- sensor.purple_aqi_sp03
- platform: average
name: 'Purple AQI SP Average 24 Hr'
precision: 1
unique_id: __legacy__
duration:
hours: 24
entities:
- sensor.purple_aqi_sp01
- sensor.purple_aqi_sp02
- sensor.purple_aqi_sp03
# --------------------------------------------------------------------------------------------
# python code to read sensors and publish to mqtt
#! /usr/bin/env python3
# purple-aqi.py
PROGRAM_NAME = "purple-aqi"
PROGRAM_VERSION = "09"
WORKING_DIRECTORY = "/home/user/purple/"
#
#
# 202011051653
# AQI from purple
#
# based on this blog post:
# http://tech.thejoestory.com/2020/09/air-quality-calculation-purple-air-api.html
#
import sys
# check version of python
if not (sys.version_info.major == 3 and sys.version_info.minor >= 5):
print("This script requires Python 3.5 or higher!")
print("You are using Python {}.{}.".format(sys.version_info.major, sys.version_info.minor))
sys.exit(1)
#print("{} {} is using Python {}.{}.".format(PROGRAM_NAME, PROGRAM_VERSION, sys.version_info.major, sys.version_info.minor))
import paho.mqtt.client as mqtt
import requests
import json
import time
from datetime import datetime
import logging
import logging.handlers
MQTT_SERVER = "192.168.1.10"
MQTT_TOPIC_BASE = PROGRAM_NAME + "/"
# how often to check the sensors in minutes
CHECK_PERIOD_MINUTES = 15
LOCATIONS = {
"SP01" : "4058",
"SP02" : "60479",
"SP03" : "6908"
}
# set default value for json return string if we do not get a valid response to API
# not the painful code below to handle embedded double quotes that are present!!!!
INVALID_SENSOR = """
{"mapVersion":"0.0","baseVersion":"0","mapVersionString":"","results":[
{
"ID":0,"Label":"Unknown","DEVICE_LOCATIONTYPE":"Unknown",
"THINGSPEAK_PRIMARY_ID":"0","THINGSPEAK_PRIMARY_ID_READ_KEY":"",
"THINGSPEAK_SECONDARY_ID":"0","THINGSPEAK_SECONDARY_ID_READ_KEY":"",
"Lat":0.0,"Lon":0.0,"PM2_5Value":"0.0","LastSeen":0,"Type":"","Hidden":"false",
"DEVICE_BRIGHTNESS":"0","DEVICE_HARDWAREDISCOVERED":"","DEVICE_FIRMWAREVERSION":"0.0",
"Version":"0.0","LastUpdateCheck":0,"Created":0,"Uptime":"0","RSSI":"0","Adc":"0.0",
"p_0_3_um":"0.0","p_0_5_um":"0.0","p_1_0_um":"0.0","p_2_5_um":"0.0","p_5_0_um":"0.0",
"p_10_0_um":"0.0","pm1_0_cf_1":"0.0","pm2_5_cf_1":"0.0","pm10_0_cf_1":"0.0",
"pm1_0_atm":"0.0","pm2_5_atm":"0.0","pm10_0_atm":"0.0","isOwner":0,
"humidity":"32","temp_f":"89","pressure":"0.0","AGE":0,"A_H":"true",
"Stats": "{\\\"v\\\":0.0,\\\"v1\\\":0.0,\\\"v2\\\":0.0,\\\"v3\\\":0.0,\\\"v4\\\":0.0,\\\"v5\\\":0.0,\\\"v6\\\":0.0,\\\"pm\\\":0.0,\\\"lastModified\\\":0,\\\"timeSinceModified\\\":0}"
},
{
"ID":0,"ParentID":0,"Label":"Unknown",
"THINGSPEAK_PRIMARY_ID":"0","THINGSPEAK_PRIMARY_ID_READ_KEY":"",
"THINGSPEAK_SECONDARY_ID":"0","THINGSPEAK_SECONDARY_ID_READ_KEY":"",
"Lat":0.0,"Lon":0.0,"PM2_5Value":"0.0","LastSeen":0,"Hidden":"false","Created":0,
"p_0_3_um":"0.0","p_0_5_um":"0.0","p_1_0_um":"0.0","p_2_5_um":"0.0","p_5_0_um":"0.0",
"p_10_0_um":"0.0","pm1_0_cf_1":"0.0","pm2_5_cf_1":"0.0","pm10_0_cf_1":"0.0",
"pm1_0_atm":"0.0","pm2_5_atm":"0.0","pm10_0_atm":"0.0","isOwner":0,"AGE":0,"A_H":"true",
"Stats":"{\\\"v\\\":0.0,\\\"v1\\\":0.0,\\\"v2\\\":0.0,\\\"v3\\\":0.0,\\\"v4\\\":0.0,\\\"v5\\\":0.0,\\\"v6\\\":0.0,\\\"pm\\\":0.0,\\\"lastModified\\\":0,\\\"timeSinceModified\\\":0}"
}
]
}
"""
# limit of difference between average reading of channels and standard deviation to flag reading as suspect
STD_DEV_LIMIT = 0.10
# Logging setup
# select logging level
logging_level_file = logging.getLevelName('INFO')
#level_file = logging.getLevelName('DEBUG')
logging_level_rsyslog = logging.getLevelName('INFO')
# log to both a local file and to a rsyslog server
LOG_FILENAME = PROGRAM_NAME + '.log'
LOG_RSYSLOG = ('192.168.1.5', 514)
root_logger = logging.getLogger()
#set loggers
# file logger
handler_file = logging.handlers.RotatingFileHandler(WORKING_DIRECTORY + LOG_FILENAME, backupCount=5)
handler_file.setFormatter(logging.Formatter(fmt='%(asctime)s %(levelname)-8s ' + PROGRAM_NAME + ' ' + '%(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
handler_file.setLevel(logging_level_file)
root_logger.addHandler(handler_file)
# Roll over on application start
handler_file.doRollover()
# rsyslog handler
handler_rsyslog = logging.handlers.SysLogHandler(address = LOG_RSYSLOG)
handler_rsyslog.setFormatter(logging.Formatter(fmt='%(asctime)s %(levelname)-8s ' + PROGRAM_NAME + ' ' + '%(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
handler_rsyslog.setLevel(logging_level_rsyslog)
root_logger.addHandler(handler_rsyslog)
my_logger = logging.getLogger(PROGRAM_NAME)
my_logger.setLevel(logging_level_file)
# these calculations of AQI based on PM 2.5 come match this definition here on wikipedia :
# https://en.wikipedia.org/wiki/Air_quality_index#Computing_the_AQI
def calcAQ (Cp, Ih, Il, BPh, BPl):
a = (Ih - Il)
b = (BPh - BPl)
c = (Cp - BPl)
aq = ((a/b) * c + Il)
return aq
def main():
try :
# connect to MQTT server
mqttc = mqtt.Client(PROGRAM_NAME + PROGRAM_VERSION) # Create instance of client with client ID
mqttc.connect(MQTT_SERVER, 1883) # Connect to (broker, port, keepalive-time)
message = {"timestamp": "{:d}".format(int(datetime.now().timestamp()))}
message["program_version"] = PROGRAM_NAME + " Version : " + PROGRAM_VERSION
message["status"] = "START"
mqttc.publish(MQTT_TOPIC_BASE + "$SYS/STATUS", json.dumps(message))
my_logger.info("Program start : " + PROGRAM_NAME + " Version : " + PROGRAM_VERSION)
# Start networking daemon
mqttc.loop_start()
# loop forever
while True:
for loc in LOCATIONS:
# pause between each request to be nice to the API
time.sleep(3)
url = 'http://www.purpleair.com/json?show=' + LOCATIONS[loc]
try:
r = requests.get(url)
except:
j = json.loads(INVALID_SENSOR)
try:
j = json.loads(r.text)
except:
j = json.loads(INVALID_SENSOR)
try:
k = j["results"][0]
except:
j = json.loads(INVALID_SENSOR)
k = j["results"][0]
# there are usually 2 channels of data at each station, get an average of them
pm2 = 0.0
pm2_reading = 0
temp = 0.0
for row in j["results"]:
temp = float(row["PM2_5Value"])
pm2 = temp + pm2
pm2_reading = pm2_reading + 1
pm2 = pm2 / float(pm2_reading)
# calculate standard deviation
pm2_varance = 0.0
pm2_reading = 0
ztemp = 0.0
temp = 0.0
for row in j["results"]:
ztemp = float(row["PM2_5Value"])
temp = (ztemp - pm2) ** 2
pm2_varance = temp + pm2_varance
pm2_reading = pm2_reading + 1
pm2_varance = pm2_varance / float(pm2_reading)
pm2_std_dev = pm2_varance ** 0.5
# calculate the AQI for from current PM2.5 value
if (pm2 > 350.5): #aqi = 1460.42
aq = calcAQ(pm2, 500, 401, 500, 350.5)
cat = "Hazardous"
cat_index = 6
color = "7f0024" # dark purple (127, 0, 36)
elif (pm2 > 250.5): #aqi = 1043.75
aq = calcAQ(pm2, 400, 301, 350.4, 250.5)
cat = "Hazardous"
cat_index = 6
color = "7f0024" # dark purple (127, 0, 36)
elif (pm2 > 150.5): #aqi = 627.1
aq = calcAQ(pm2, 300, 201, 250.4, 150.5)
cat = "Very Unhealthy"
cat_index = 5
color = "98004c" #purple (152, 0, 76)
elif (pm2 > 55.5): #aqi = 231.25
aq = calcAQ(pm2, 200, 151, 150.4, 55.5)
cat = "Unhealthy"
cat_index = 4
color = "ff0200" #red (255, 2, 0)
elif (pm2 > 35.5): #aqi = 147.92
aq = calcAQ(pm2, 150, 101, 55.4, 35.5)
cat = "Unhealthy for Sensitive Groups"
cat_index = 3
color = "fe7f03" #orange (254, 127, 3)
elif (pm2 > 12.1): #aqi = 50.42
aq = calcAQ(pm2, 100, 51, 35.4, 12.1)
cat = "Moderate"
cat_index = 2
color = "fffe0a" #yellow (255, 254, 10)
elif (pm2 >= 0):
aq = calcAQ(pm2, 50, 0, 12, 0)
cat = "Good"
cat_index = 1
color = "00e500" #green (0, 229, 0)
# get tempurature, humidity and air pressure
pur_temperature_f = float(k.get("temp_f", 0.0))
pur_temperature_c = (pur_temperature_f - 32.0) * 5.0/9.0
pur_humidity = float(k.get("humidity", 0.0))
pur_pressure_hpa = float(k.get("pressure", 0.0))
# get sensor location
pur_location = k.get("Label", "Unknown")
pur_latitude = float(k.get("Lat", 0.0))
pur_longitude = float(k.get("Lon", 0.0))
# get sensor time
pur_reading_lastseen = int(k.get("LastSeen", 0))
pur_reading_age = int(k.get("AGE", 0))
# get historical data
h = json.loads(k.get("Stats", ""))
pm2_hist_current = float(h.get("v", 0.0))
pm2_hist_10_m = float(h.get("v1", 0.0))
pm2_hist_30_m = float(h.get("v2", 0.0))
pm2_hist_1_h = float(h.get("v3", 0.0))
pm2_hist_6_h = float(h.get("v4", 0.0))
pm2_hist_24_h = float(h.get("v5", 0.0))
pm2_hist_1_w = float(h.get("v6", 0.0))
# this is a millisecond timestamp
pm2_hist_calc_time_milli = float(h.get("lastModified", 0)) / 1000.0
pm2_hist_calc_delta = int(h.get("timeSinceModified", 0))
# reading status
# not sure exactly what is returned for these two entries,
# so this is best guess to get a good True or False for sensor status
z_status = k.get("Flag", 'false')
if (z_status == 'false') :
pur_high_reading = False
else :
pur_high_reading = True
z_status = k.get("A_H", 'false')
if (z_status == 'false') :
pur_downgraded_reading = False
else :
pur_downgraded_reading = True
# set values calculated above incorrectly based on zero PM25 to correct value for error
cat = "Unknown"
cat_index = 0
color = (0, 0, 0)
# set flag based on how much the values returned by the two channels differed, handle possible division by zero if
# station data was not retrived and I set the default value for pm2 to zero
try:
if (abs((pm2 - pm2_std_dev) / pm2)) < STD_DEV_LIMIT :
pur_channel_different = True
else :
pur_channel_different = False
except:
pur_channel_different = False
message = {"timestamp": "{:d}".format(int(datetime.utcnow().timestamp()))}
# publish "unknown" for jason addributes if not a valid reading, this will result in 'Unknown' values in Home Assistant
if ( (not pur_high_reading) and (not pur_downgraded_reading) and (not pur_channel_different) ) :
message["label"] = pur_location
message["pm25"] = "{:.1f}".format(pm2)
message["aqi"] = "{:.1f}".format(aq)
message["category"] = cat
message["category_index"] = "{:d}".format(cat_index)
message["color"] = "{0}".format(color)
message["temperature_f"] = "{:.0f}".format(pur_temperature_f)
message["temperature_c"] = "{:.0f}".format(pur_temperature_c)
message["humidity_pct"] = "{:.0f}".format(pur_humidity)
message["pressure_hpa"] = "{:.1f}".format(pur_pressure_hpa)
message["latitude"] = "{:.7f}".format(pur_latitude)
message["longitude"] = "{:.7f}".format(pur_longitude)
message["last_seen"] = "{:d}".format(pur_reading_lastseen)
message["reading_age_sec"] = "{:d}".format(pur_reading_age)
message["pm25_current"] = "{:.1f}".format(pm2_hist_current)
message["pm25_10_min_avg"] = "{:.1f}".format(pm2_hist_10_m)
message["pm25_30_minu_avg"] = "{:.1f}".format(pm2_hist_30_m)
message["pm25_1_hr_avg"] = "{:.1f}".format(pm2_hist_1_h)
message["pm25_6_hr_avg"] = "{:.1f}".format(pm2_hist_6_h)
message["pm25_24_hr_avg"] = "{:.1f}".format(pm2_hist_24_h)
message["pm25_1_w_avg"] = "{:.1f}".format(pm2_hist_1_w)
message["history_updated"] = "{:f}".format(pm2_hist_calc_time_milli)
message["history_delta_ms"] = "{:d}".format(pm2_hist_calc_delta)
else :
message["label"] = "unknown"
message["pm25"] = "unknown"
message["aqi"] = "unknown"
message["category"] = "unknown"
message["category_index"] = "unknown"
message["color"] = "unknown"
message["temperature_f"] = "unknown"
message["temperature_c"] = "unknown"
message["humidity_pct"] = "unknown"
message["pressure_hpa"] = "unknown"
message["latitude"] = "unknown"
message["longitude"] = "unknown"
message["last_seen"] = "unknown"
message["reading_age_sec"] = "unknown"
message["pm25_current"] = "unknown"
message["pm25_10_min_avg"] = "unknown"
message["pm25_30_minu_avg"] = "unknown"
message["pm25_1_hr_avg"] = "unknown"
message["pm25_6_hr_avg"] = "unknown"
message["pm25_24_hr_avg"] = "unknown"
message["pm25_1_w_avg"] = "unknown"
message["history_updated"] = "unknown"
message["history_delta_ms"] = "unknown"
message["high_reading"] = str(pur_high_reading)
message["downgraded_reading"] = str(pur_downgraded_reading)
message["large_channel_diff"] = str(pur_channel_different)
message["loc"] = LOCATIONS[loc]
# Publish message to topic
mqttc.publish(MQTT_TOPIC_BASE + loc, json.dumps(message))
# end for loop thru sensors
# check the sensors every CHECK_PERIOD_MINUTES
time.sleep(CHECK_PERIOD_MINUTES * 60)
# end loop forever
except KeyboardInterrupt :
message = {"timestamp": "{:d}".format(int(datetime.now().timestamp()))}
message["program_version"] = PROGRAM_NAME + " Version : " + PROGRAM_VERSION
message["status"] = "STOP"
mqttc.publish(MQTT_TOPIC_BASE + "$SYS/STATUS", json.dumps(message))
mqttc.disconnect()
mqttc.loop_stop()
my_logger.info("Keyboard interrupt.")
sys.exit(0)
except :
my_logger.critical("Unhandled error : " + traceback.format_exc())
sys.exit(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment