Skip to content

Instantly share code, notes, and snippets.

@bphermansson
Last active December 6, 2022 08:04
Show Gist options
  • Save bphermansson/82aa42fd59fc0874133b2849634a4544 to your computer and use it in GitHub Desktop.
Save bphermansson/82aa42fd59fc0874133b2849634a4544 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# Script that reads various I2C sensors.
# CCS811, BH1750, HTU21. (Air quality, light, temp & humidity).
# CCS811. "This part will measure eCO2 (equivalent calculated carbon-dioxide) concentration
# within a range of 400 to 8192 parts per million (ppm), and TVOC (Total Volatile Organic
# Compound) concentration within a range of 0 to 1187 parts per billion (ppb)."
# "The BH1750 provides 16-bit light measurements in lux"
# The script sends this values in this order by MQTT:
# TVOC, CO2, light, temperature, humidity
# Connections, pin numbers on RPI's Gpio
# SCL pin 5 Gpio3
# Gnd pin 9 Gnd
# Vcc pin 1 3V3
# SDA pin 3 Gpio2
import sys
from smbus import SMBus
from time import sleep
from datetime import datetime
from htu21 import HTU21
import time
import json
import paho.mqtt.client as paho
broker="192.168.1.190"
mqtt_sensors_topic="magicmirror/sensors"
mqtt_error_topic = "magicmirror/errors"
mqtt_info_topic = "magicmirror/info"
client= paho.Client("MagicMirrorSensors")
#CCS811
SLAVE_ADDR = 0x5B # Can be 0x5A
# Slave registers to read and write
DEVICE_REG_STATUS = 0x00
DEVICE_REG_MEAS_MODE = 0x01
DEVICE_REG_ALG_RESULT_DATA = 0x02
DEVICE_REG_ERROR_ID = 0xE0
DEVICE_REG_APP_START = 0xF4
# Slave register read values
DEVICE_STATE_BOOT = 0x10
DEVICE_STATE_APP = 0x90
DEVICE_STATE_APP_WITH_DATA = 0x98
# Slave register write values
DEVICE_SET_MODE_10S = [0x10]
DEVICE_SET_SW_RESET = [0x11, 0xE5, 0x72, 0x8A]
# BH1750
BH1750_DEVICE = 0x23 # Default device I2C address
DEVICE = 0x23 # Default device I2C address
POWER_DOWN = 0x00 # No active state
POWER_ON = 0x01 # Power on
RESET = 0x07 # Reset data register value
# Start measurement at 4lx resolution. Time typically 16ms.
CONTINUOUS_LOW_RES_MODE = 0x13
# Start measurement at 1lx resolution. Time typically 120ms
CONTINUOUS_HIGH_RES_MODE_1 = 0x10
# Start measurement at 0.5lx resolution. Time typically 120ms
CONTINUOUS_HIGH_RES_MODE_2 = 0x11
# Start measurement at 1lx resolution. Time typically 120ms
# Device is automatically set to Power Down after measurement.
ONE_TIME_HIGH_RES_MODE_1 = 0x20
# Start measurement at 0.5lx resolution. Time typically 120ms
# Device is automatically set to Power Down after measurement.
ONE_TIME_HIGH_RES_MODE_2 = 0x21
# Start measurement at 1lx resolution. Time typically 120ms
# Device is automatically set to Power Down after measurement.
ONE_TIME_LOW_RES_MODE = 0x23
# 0 = /dev/i2c-0 (port I2C0), 1 = /dev/i2c-1 (port I2C1)
try:
i2c_bus = SMBus(1)
except:
errorRoutine("I2C")
# CCS811
def read_i2c_bus_dev(dev_reg, read_len):
return i2c_bus.read_i2c_block_data(SLAVE_ADDR, dev_reg, read_len)
def write_i2c_bus_dev(dev_reg, write_data):
i2c_bus.write_i2c_block_data(SLAVE_ADDR, dev_reg, write_data)
def readCCS811():
try:
if read_i2c_bus_dev(DEVICE_REG_STATUS, 1)[0] == DEVICE_STATE_BOOT:
write_i2c_bus_dev(DEVICE_REG_APP_START, []) # empty write
write_i2c_bus_dev(DEVICE_REG_MEAS_MODE, DEVICE_SET_MODE_10S)
read_i2c_bus_dev(DEVICE_REG_ERROR_ID, 1) # clear any errors
timeout_in_seconds = 30
while (timeout_in_seconds and
(read_i2c_bus_dev(DEVICE_REG_STATUS, 1)[0] !=
DEVICE_STATE_APP_WITH_DATA)):
sleep(5)
timeout_in_seconds -= 5
if not timeout_in_seconds:
raise
read_data = read_i2c_bus_dev(DEVICE_REG_ALG_RESULT_DATA, 8)
eCO2_reading = (read_data[0] << 8) | read_data[1]
eTVOC_reading = (read_data[2] << 8) | read_data[3]
#print ("eCO2: ")
#print (eCO2_reading)
#print ("eTVOC: ")
#print (eTVOC_reading)
list = []
list.append(eTVOC_reading)
list.append(eCO2_reading)
return list
#raise ValueError # Used to test the exception
except Exception as e:
errorRoutine("CCS811 reading")
# BH1750
def readLight(addr=BH1750_DEVICE):
# Read data from I2C interface
try:
data = i2c_bus.read_i2c_block_data(addr,ONE_TIME_HIGH_RES_MODE_1)
return convertToNumber(data)
except:
errorRoutine("readLight")
def convertToNumber(data):
# Simple function to convert 2 bytes of data
# into a decimal number. Optional parameter 'decimals'
# will round to specified number of decimal places.
result=(data[1] + (256 * data[0])) / 1.2
return (result)
def errorRoutine(src):
err = str(sys.exc_info()[0])
err = "Error: " + src + " " + err
print(err)
client.publish(mqtt_error_topic, err)
def main():
print("Main")
htu = HTU21()
print("connecting to broker ",broker)
client.connect(broker)
client.loop_start()
client.publish(mqtt_info_topic,"start")
jsonValues = ['123']
while True:
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
#print("Current Time =", current_time)
lightLevel=readLight()
#print("Light Level : " + format(lightLevel,'.2f') + " lx")
#print("Temperature: " + format(htu.read_temperature(),'.2f'))
#print("Humidity: : " + format(htu.read_humidity(),'.2f'))
lFormat = format(lightLevel,'.2f')
tFormat = format(htu.read_temperature(),'.2f')
hFormat = format(htu.read_humidity(),'.2f')
try:
jsonValues = readCCS811()
jsonValues.append(lFormat)
jsonValues.append(tFormat)
jsonValues.append(hFormat)
#raise ValueError
except:
errorRoutine("Json append")
try:
json_decoded_data = json.dumps({"Time": current_time, "TVOC": jsonValues[0], "CO2": jsonValues[1], "light": lFormat, "Temperature": tFormat, "Humidity": hFormat})
print (json_decoded_data)
client.publish(mqtt_sensors_topic,json_decoded_data)
except:
errorRoutine("Mqtt publish")
time.sleep(30)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print ('Interrupted')
client.publish(mqtt_info_topic, "Shutting down")
client.disconnect() #disconnect
client.loop_stop() #stop loop
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment