Skip to content

Instantly share code, notes, and snippets.

@pingud98
Created April 29, 2021 05:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pingud98/0f16f733e2c5a1a2ee1afd3879acd546 to your computer and use it in GitHub Desktop.
Save pingud98/0f16f733e2c5a1a2ee1afd3879acd546 to your computer and use it in GitHub Desktop.
Indoor Air Quality Monitor
#
# based on three things:
# 1 - CCS811_RPi class usage example
# by Petr Lukas V1
# 2 - I2C_LCD_driver
# Compiled, mashed and generally mutilated 2014-2015 by Denis Pleic
# 3 - BME680
# Pimoroni Library, installed via pip https://shop.pimoroni.com/products/bme680
from datetime import datetime
import requests
import time
import uuid
import bme680
from CCS811_RPi import CCS811_RPi
import I2C_LCD_driver
import paho.mqtt.client as paho
broker="cosmicpidata.mooo.com"
port=1883
mqtt_ok=0
url = "http://www.google.com"
timeout = 600
def on_publish(client,userdata,result):
print("Data published to MQTT server")
pass
def on_connect(client,userdata,flags,rc):
if rc==0:
print("connected ok")
else:
print("mqtt connection failed")
def on_disconnect(client,userdata,rc):
print("disconnecting reason " + str(rc))
mqttidentstring = str(uuid.getnode())
client1= paho.Client(mqttidentstring)
client1.on_publish = on_publish
client1.username_pw_set(username="cosmicpi",password="MuonsFROMSp8ce")
client1.on_connect=on_connect #binding call
try:
request = requests.get(url,timeout=timeout)
print("internet ok")
mqtt_ok=1
except (requests.ConnectionError, requests.Timeout) as exception:
print("internet fail")
mqtt_ok=0
mylcd = I2C_LCD_driver.lcd()
mylcd.lcd_display_string("Warming up...", 1)
#import urllib2 # comment this line if you don't need ThinkSpeak connection
#import SDL_Pi_HDC1000 # comment this line if you don't use HDC sensor
try:
sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY)
except IOError:
sensor = bme680.BME680(bme680.I2C_ADDR_SECONDARY)
now = datetime.now() # current date and time
date_time = now.strftime("%m/%d/%Y")
date1=(date_time)
date_time = now.strftime("%H:%M")
date2=(date_time)
mylcd.lcd_display_string("Date: "+date1,1)
mylcd.lcd_display_string("Time: "+date2,2)
time.sleep(5)
ccs811 = CCS811_RPi()
# Do you want to preset sensor baseline? If yes set the value here, otherwise set False
INITIALBASELINE = False
# Do you want to use integrated temperature meter to compensate temp/RH (CJMCU-8118 board)?
# If not pre-set sensor compensation temperature is 25 C and RH is 50 %
# You can compensate manually by method ccs811.setCompensation(temperature,humidity)
'''
MEAS MODE REGISTER AND DRIVE MODE CONFIGURATION
0b0 Idle (Measurements are disabled in this mode)
0b10000 Constant power mode, IAQ measurement every second
0b100000 Pulse heating mode IAQ measurement every 10 seconds
0b110000 Low power pulse heating mode IAQ measurement every 60
0b1000000 Constant power mode, sensor measurement every 250ms
'''
# Set MEAS_MODE (measurement interval)
configuration = 0b100000
# Set read interval for retriveving last measurement data from the sensor
pause = 60
print('Checking hardware ID...')
hwid = ccs811.checkHWID()
if(hwid == hex(129)):
print('Hardware ID is correct')
else: print('Incorrect hardware ID ',hwid, ', should be 0x81')
#bme680 setup
sensor.set_humidity_oversample(bme680.OS_2X)
sensor.set_pressure_oversample(bme680.OS_4X)
sensor.set_temperature_oversample(bme680.OS_8X)
sensor.set_filter(bme680.FILTER_SIZE_3)
sensor.set_gas_status(bme680.ENABLE_GAS_MEAS)
sensor.set_gas_heater_temperature(320)
sensor.set_gas_heater_duration(150)
sensor.select_gas_heater_profile(0)
#run burn-in
start_time = time.time()
curr_time = time.time()
burn_in_time = 300
burn_in_data = []
# Collect gas resistance burn-in values, then use the average
# of the last 50 values to set the upper limit for calculating
# gas_baseline.
print('Collecting gas resistance burn-in data for 5 mins\n')
while curr_time - start_time < burn_in_time:
curr_time = time.time()
if sensor.get_sensor_data() and sensor.data.heat_stable:
gas = sensor.data.gas_resistance
burn_in_data.append(gas)
print('Gas: {0} Ohms'.format(gas))
outputwarm ='{0:.2f} Ohms'.format(gas)
mylcd.lcd_display_string(outputwarm, 2)
time.sleep(1)
gas_baseline = sum(burn_in_data[-50:]) / 50.0
# Set the humidity baseline to 40%, an optimal indoor humidity.
hum_baseline = 40.0
# This sets the balance between humidity and gas reading in the
# calculation of air_quality_score (25:75, humidity:gas)
hum_weighting = 0.25
print('Gas baseline: {0} Ohms, humidity baseline: {1:.2f} %RH\n'.format(
gas_baseline,
hum_baseline))
time.sleep(1)
if sensor.get_sensor_data():
sensor.set_gas_heater_temperature(300)
sensor.set_gas_heater_duration(100)
sensor.select_gas_heater_profile(0)
hum = sensor.data.humidity
temp = sensor.data.temperature
press = sensor.data.pressure
#print 'MEAS_MODE:',ccs811.readMeasMode()
ccs811.configureSensor(configuration)
print('MEAS_MODE:',ccs811.readMeasMode())
print('STATUS: ',bin(ccs811.readStatus()))
print('---------------------------------')
# Use these lines if you need to pre-set and check sensor baseline value
if(INITIALBASELINE > 0):
ccs811.setBaseline(INITIALBASELINE)
print(ccs811.readBaseline())
print("MQTT connection")
if mqtt_ok==1:
client1.connect(broker,port)
client1.loop_start()
#return 0
while(1):
humidity = hum
temperature = temp
ccs811.setCompensation(temperature,humidity)
statusbyte = ccs811.readStatus()
print('STATUS: ', bin(statusbyte))
error = ccs811.checkError(statusbyte)
if(error):
print('ERROR:',ccs811.checkError(statusbyte))
if(not ccs811.checkDataReady(statusbyte)):
#print('No new samples are ready')
#print('---------------------------------')
time.sleep(pause)
continue;
result = ccs811.readAlg();
if(not result):
#print 'Invalid result received'
time.sleep(pause)
continue;
baseline = ccs811.readBaseline()
outputCO2 = ('eCO2: {}'.format(result['eCO2']) + ' ppm')
outputVOC = ('TVOC: {}'.format(result['TVOC']) + ' ppb')
#print ('TVOC: ',result['TVOC'], 'ppb')
#print ('Status register: ',bin(result['status']))
#print ('Last error ID: ',result['errorid'])
#print ('RAW data: ',result['raw'])
#print ('Baseline: ',baseline)
#print ('---------------------------------')
#if sensor.get_sensor_data():
#if sensor.data.heat_stable:
if sensor.get_sensor_data() and sensor.data.heat_stable:
gas = sensor.data.gas_resistance
gas_offset = gas_baseline - gas
hum = sensor.data.humidity
hum_offset = hum - hum_baseline
# Calculate hum_score as the distance from the hum_baseline.
if hum_offset > 0:
hum_score = (100 - hum_baseline - hum_offset)
hum_score /= (100 - hum_baseline)
hum_score *= (hum_weighting * 100)
else:
hum_score = (hum_baseline + hum_offset)
hum_score /= hum_baseline
hum_score *= (hum_weighting * 100)
# Calculate gas_score as the distance from the gas_baseline.
if gas_offset > 0:
gas_score = (gas / gas_baseline)
gas_score *= (100 - (hum_weighting * 100))
else:
gas_score = 100 - (hum_weighting * 100)
# Calculate air_quality_score.
air_quality_score = hum_score + gas_score
outputT = '{0:.2f}C'.format(sensor.data.temperature)
outputP = '{0:.2f}hPa'.format(sensor.data.pressure)
outputH = '{0:.1f}%RH'.format(sensor.data.humidity)
outputG = '{0:.0f} Ohms'.format(sensor.data.gas_resistance)
outputIAQ = 'IAQ:{0:.1f}'.format(air_quality_score)
#print("IAQ {0:.2f} %".format(air_quality_score))
data = []
data.append("{measurement},id={DeviceID} Temp={temp},Press={press},Hum={hum},Gas={gas},IAQ={iaq},eCO2={eco2},TVOC={tvoc} {timestamp}"
.format(measurement='IAQ0.1',
DeviceID=uuid.getnode(),
temp=sensor.data.temperature,
press=sensor.data.pressure,
hum=sensor.data.humidity,
gas=sensor.data.gas_resistance,
iaq=air_quality_score,
eco2=result['eCO2'],
tvoc=result['TVOC'],
timestamp=int(time.time())))
print(data)
if mqtt_ok==1:
ret = client1.publish("iaq/0.1",str(data))
#outputs to lcd
#date and time
mylcd.lcd_clear()
now = datetime.now() # current date and time
date_time = now.strftime("%m/%d/%Y")
date1=(date_time)
date_time = now.strftime("%H:%M")
date2=(date_time)
mylcd.lcd_display_string(date1+" "+ date2,1)
mylcd.lcd_display_string(outputG,2)
time.sleep(6)
mylcd.lcd_clear()
#TPH and IAQ
mylcd.lcd_display_string(outputT,1)
mylcd.lcd_display_string(outputIAQ,2)
time.sleep(6)
mylcd.lcd_clear()
mylcd.lcd_display_string(outputIAQ,1)
mylcd.lcd_display_string(outputCO2,2)
time.sleep(6)
mylcd.lcd_clear()
mylcd.lcd_display_string(outputCO2,1)
mylcd.lcd_display_string(outputVOC,2)
time.sleep(6)
mylcd.lcd_clear()
mylcd.lcd_display_string(outputVOC,1)
mylcd.lcd_display_string(outputP,2)
time.sleep(5)
mylcd.lcd_clear()
mylcd.lcd_display_string(outputP,1)
mylcd.lcd_display_string(outputH,2)
time.sleep(5)
mylcd.lcd_clear()
now = datetime.now() # current date and time
date_time = now.strftime("%m/%d/%Y")
date1=(date_time)
date_time = now.strftime("%H:%M")
date2=(date_time)
mylcd.lcd_display_string(date1+" "+ date2,1)
mylcd.lcd_display_string(outputG,2)
time.sleep(4)
mylcd.lcd_clear()
#TPH and IAQ
mylcd.lcd_display_string(outputT,1)
mylcd.lcd_display_string(outputIAQ,2)
time.sleep(4)
mylcd.lcd_clear()
mylcd.lcd_display_string(outputIAQ,1)
mylcd.lcd_display_string(outputCO2,2)
time.sleep(4)
mylcd.lcd_clear()
mylcd.lcd_display_string(outputCO2,1)
mylcd.lcd_display_string(outputVOC,2)
time.sleep(4)
mylcd.lcd_clear()
mylcd.lcd_display_string(outputVOC,1)
mylcd.lcd_display_string(outputP,2)
time.sleep(4)
mylcd.lcd_clear()
mylcd.lcd_display_string(outputP,1)
mylcd.lcd_display_string(outputH,2)
time.sleep(4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment