Skip to content

Instantly share code, notes, and snippets.

@rat-h
Forked from xxlukas42/CCS811_RPi.py
Last active December 23, 2020 03:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rat-h/150ffa2877efed51df03b5de6e8c97e1 to your computer and use it in GitHub Desktop.
Save rat-h/150ffa2877efed51df03b5de6e8c97e1 to your computer and use it in GitHub Desktop.

Updated version of the library for Python 3 See an example of airpollution monitor in airmonitor.py

"""
A simple air pollution monitor.
I used ST7735 TFT display and Adafruit CircuitPython RGB library
as described [here](https://learn.adafruit.com/1-8-tft-display/python-wiring-and-setup)
It prints out time and data on the screen and eCO2 and other measurements.
Colors of text vary according to the measurement.
Note that by some reason R and B colors were switched on my display.
So go ahead and correct r and b according to your hardware in hand.
Also, my sensor pretty often shows impossible numbers for eCO2 (I would be dead already).
To minimize these errors, I add re-initiation every hour. I hope it may be just a bad sensor.
If you find a way to make this system more robust without re-initiation, let me know.
Video is [here] https://youtu.be/4aXMTt8Ia9Q
-RTH
"""
import time, logging
import subprocess
import digitalio
import board
from PIL import Image, ImageDraw, ImageFont
import matplotlib
matplotlib.use('Agg')
from matplotlib.pyplot import get_cmap
import adafruit_rgb_display.st7735 as st7735 # pylint: disable=unused-import
from CCS811_RPi import CCS811_RPi
import SDL_Pi_HDC1000
logging.basicConfig(format='%(asctime)s:%(lineno)-6d%(levelname)-8s:%(message)s', level=logging.INFO,filename="airpol.log")
HDC1080 = True
configuration = 0b100000
pause = 10
reinit = 3600
record = "rec.cvs"
# Configuration for CS and DC pins (these are PiTFT defaults):
cs_pin = digitalio.DigitalInOut(board.CE0)
dc_pin = digitalio.DigitalInOut(board.D25)
reset_pin = digitalio.DigitalInOut(board.D24)
# Config for display baudrate (default max is 24mhz):
BAUDRATE = 24000000
# Setup SPI bus using hardware SPI:
spi = board.SPI()
# pylint: disable=line-too-long
# Create the display:
disp = st7735.ST7735R(
spi,
rotation=0, # 2.2", 2.4", 2.8", 3.2" ILI9341
height=160,
width=128,
cs=cs_pin,
dc=dc_pin,
rst=reset_pin,
baudrate=BAUDRATE,
)
if disp.rotation % 180 == 90:
height = disp.width
width = disp.height
else:
width = disp.width
height = disp.height
TIME_FONTSIZE = 24
MESG_FONTSIZE = 16
HEAD_FONTSIZE = 10
image = Image.new("RGB", (width, height))
# Get drawing object to draw on image.
draw = ImageDraw.Draw(image)
# Draw a black filled box to clear the image.
draw.rectangle((0, 0, width, height), outline=0, fill=(0, 0, 0))
disp.image(image)
# Load a TTF font. Make sure the .ttf font file is in the
# same directory as the python script!
# Some other nice fonts to try: http://www.dafont.com/bitmap.php
timefont = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", TIME_FONTSIZE)
mesgfont = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", MESG_FONTSIZE)
headfont = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", HEAD_FONTSIZE)
draw.rectangle((20, 20, width-20, height-20), outline=0, fill=(0, 0, 255))
disp.image(image)
def _print_on_screen_(text,x,y,font, r,g,b):
draw.text((x, y), text, font=font, fill=(b,g,r))
dx,dy = font.getsize(text)
return x+dx+2,y+dy+2
def init_sensor():
ccs811 = CCS811_RPi()
hwid = ccs811.checkHWID()
if(hwid == hex(129)):
draw.rectangle((20, 20, width-20, height-20), outline=0, fill=(0, 255, 0))
disp.image(image)
else:
_print_on_screen_("HARDWARE\nERROR",0,0,timefont,255,0,0)
disp.image(image)
exit(1)
if(HDC1080):
hdc1000 = SDL_Pi_HDC1000.SDL_Pi_HDC1000()
hdc1000.turnHeaterOff()
hdc1000.setTemperatureResolution(SDL_Pi_HDC1000.HDC1000_CONFIG_TEMPERATURE_RESOLUTION_14BIT)
hdc1000.setHumidityResolution(SDL_Pi_HDC1000.HDC1000_CONFIG_HUMIDITY_RESOLUTION_14BIT)
else:
hdc1000 = None
#print 'MEAS_MODE:',ccs811.readMeasMode()
ccs811.configureSensor(configuration)
draw.rectangle((0, 0, width, height), outline=0, fill=0)
_,y = _print_on_screen_(f'MEAS_MODE:',0,0,timefont,0,198,120)
_,y = _print_on_screen_(f' {ccs811.readMeasMode()}',0,y,timefont,0,198,120)
_,y = _print_on_screen_(f'STATUS :',0,y,timefont,120,198,0)
_,y = _print_on_screen_(f' {bin(ccs811.readStatus())}',0,y,timefont,120,198,0)
time.sleep(2)
# Use these lines if you need to pre-set and check sensor baseline value
# if(INITIALBASELINE > 0):
# ccs811.setBaseline(INITIALBASELINE)
# print((ccs811.readBaseline()))
return ccs811, hdc1000
cnt = 0
result = {
"eCO2" : 0.,
'TVOC' : 0.,
'TEMP' : 0.,
"HUMD" : 0.
}
ccs811, hdc1000 = None, None
while True:
#Init sensor
if cnt == 0 :
del ccs811, hdc1000
ccs811, hdc1000 = init_sensor()
logging.info("Reset device")
time.sleep(1)
cnt = (cnt+1)%reinit
draw.rectangle((0, 0, width, height), outline=0, fill=0)
x,y = 0, 0
TIME = time.strftime("%H{}%M".format(":" if cnt%2 == 0 else " ") )
x0,y0 = _print_on_screen_(TIME,x,y+3,timefont,255,255,255)
DATE = time.strftime("%d/%m\n%Y")
_,_ = _print_on_screen_(DATE,x0+9,y+3,headfont,255,255,255)
draw.line((x0+3, y , x0+3 , y0+4),fill=(255,255,255),width=3)
draw.line((0 , y0+4, width, y0+4),fill=(255,255,255),width=3)
y = y0+8
#----CO2
CO2 = f"{result['eCO2']}"
if result['eCO2'] == "ERROR":
r,g,b = 1.,0.,0.
else:
r,g,b,_ = get_cmap('rainbow')(1. if result['eCO2'] > 1200 else result['eCO2']/1200)
x0,y0 = _print_on_screen_("eCO2 :\nppm",x,y,headfont,int(255*r),int(255*g),int(255*b))
_,y = _print_on_screen_(CO2,x0+1,y,timefont,int(255*r),int(255*g),int(255*b))
y += 6
#---TVOC
TVOC = f"{result['TVOC']}"
if result['TVOC'] == "ERROR":
r,g,b = 1.,0.,0.
else:
r,g,b,_ = get_cmap('rainbow')(1. if result['TVOC'] > 660 else result['TVOC']/660)
x0,y0 = _print_on_screen_("TVOC :\nppb",x,y,headfont,int(255*r),int(255*g),int(255*b))
_,y = _print_on_screen_(TVOC,x0+1,y,timefont,int(255*r),int(255*g),int(255*b))
y += 6
#---Temp
TEMP = "{:0.2f}".format(result['TEMP'])
if result['TVOC'] == "ERROR":
r,g,b = 1.,0.,0.
else:
r,g,b,_ = get_cmap('rainbow')(1. if result['TEMP'] > 50 else result['TEMP']/50)
x0,y0 = _print_on_screen_("TEMP :\nC",x,y,headfont,int(255*r),int(255*g),int(255*b))
_,y = _print_on_screen_(TEMP,x0+1,y,timefont,int(255*r),int(255*g),int(255*b))
y += 6
#---Humidity
HUMID = "{:0.2f}".format(result['HUMD'])
if result['TVOC'] == "ERROR":
r,g,b = 1.,0.,0.
else:
r,g,b,_ = get_cmap('rainbow')(1. if result['HUMD'] > 70 else result['HUMD']/70)
x0,y0 = _print_on_screen_("HUMID:\n%",x,y,headfont,int(255*r),int(255*g),int(255*b))
_,y = _print_on_screen_(HUMID,x0+1,y,timefont,int(255*r),int(255*g),int(255*b))
# Display image.
disp.image(image)
if cnt%pause != 0 : continue
if(HDC1080):
humidity = hdc1000.readHumidity()
temperature = hdc1000.readTemperature()
ccs811.setCompensation(temperature,humidity)
else:
humidity = 50.00
temperature = 25.00
try:
statusbyte = ccs811.readStatus()
logging.debug(f'STATUS: {bin(statusbyte)}')
except:
del ccs811
ccs811 = CCS811_RPi()
continue
try:
error = ccs811.checkError(statusbyte)
if(error):
logging.error(f'ERROR:{ccs811.checkError(statusbyte)}')
except:
del ccs811
ccs811 = CCS811_RPi()
continue
try:
if(not ccs811.checkDataReady(statusbyte)):
logging.info('No new samples are ready')
continue;
result = ccs811.readAlg();
if(not result):
result = {
"eCO2" : "ERROR",
'TVOC' : "ERROR",
'TEMP' : temperature,
"HUMD" : humidity
}
continue;
result["TEMP"] = temperature
result["HUMD"] = humidity
baseline = ccs811.readBaseline()
except:
del ccs811
ccs811 = CCS811_RPi()
continue
#Record data
if record:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
with open(record,"a") as fd:
fd.write(timestamp+f",{result['eCO2']},{result['TVOC']},{result['TEMP']},{result['HUMD']}"+"\n")
"""
A simple air pollution monitor.
I used ST7735 TFT display and Adafruit CircuitPython RGB library
as described [here](https://learn.adafruit.com/1-8-tft-display/python-wiring-and-setup)
It prints out time and data on the screen and eCO2 and other measurements.
Colors of text vary according to the measurement.
Note that by some reason R and B colors were switched on my display.
So go ahead and correct r and b according to your hardware in hand.
Also, my sensor pretty often shows impossible numbers for eCO2 (I would be dead already).
To minimize these errors, I add re-initiation every hour. I hope it may be just a bad sensor.
If you find a way to make this system more robust without re-initiation, let me know.
Video is [here] https://youtu.be/4aXMTt8Ia9Q
-RTH
"""
import time, logging
import subprocess
import digitalio
import board
from PIL import Image, ImageDraw, ImageFont
import matplotlib
matplotlib.use('Agg')
from matplotlib.pyplot import get_cmap
import adafruit_rgb_display.st7735 as st7735 # pylint: disable=unused-import
from CCS811_RPi import CCS811_RPi
import SDL_Pi_HDC1000
logging.basicConfig(format='%(asctime)s:%(lineno)-6d%(levelname)-8s:%(message)s', level=logging.INFO,filename="airpol.log")
HDC1080 = True
configuration = 0b100000
pause = 10
reinit = 3600
record = "rec.cvs"
# Configuration for CS and DC pins (these are PiTFT defaults):
cs_pin = digitalio.DigitalInOut(board.CE0)
dc_pin = digitalio.DigitalInOut(board.D25)
reset_pin = digitalio.DigitalInOut(board.D24)
# Config for display baudrate (default max is 24mhz):
BAUDRATE = 24000000
# Setup SPI bus using hardware SPI:
spi = board.SPI()
# pylint: disable=line-too-long
# Create the display:
disp = st7735.ST7735R(
spi,
rotation=0, # 2.2", 2.4", 2.8", 3.2" ILI9341
height=160,
width=128,
cs=cs_pin,
dc=dc_pin,
rst=reset_pin,
baudrate=BAUDRATE,
)
if disp.rotation % 180 == 90:
height = disp.width
width = disp.height
else:
width = disp.width
height = disp.height
TIME_FONTSIZE = 24
MESG_FONTSIZE = 16
HEAD_FONTSIZE = 10
image = Image.new("RGB", (width, height))
# Get drawing object to draw on image.
draw = ImageDraw.Draw(image)
# Draw a black filled box to clear the image.
draw.rectangle((0, 0, width, height), outline=0, fill=(0, 0, 0))
disp.image(image)
# Load a TTF font. Make sure the .ttf font file is in the
# same directory as the python script!
# Some other nice fonts to try: http://www.dafont.com/bitmap.php
timefont = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", TIME_FONTSIZE)
mesgfont = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", MESG_FONTSIZE)
headfont = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", HEAD_FONTSIZE)
draw.rectangle((20, 20, width-20, height-20), outline=0, fill=(0, 0, 255))
disp.image(image)
def _print_on_screen_(text,x,y,font, r,g,b):
draw.text((x, y), text, font=font, fill=(b,g,r))
dx,dy = font.getsize(text)
return x+dx+2,y+dy+2
def init_sensor():
ccs811 = CCS811_RPi()
hwid = ccs811.checkHWID()
if(hwid == hex(129)):
draw.rectangle((20, 20, width-20, height-20), outline=0, fill=(0, 255, 0))
disp.image(image)
else:
_print_on_screen_("HARDWARE\nERROR",0,0,timefont,255,0,0)
disp.image(image)
exit(1)
if(HDC1080):
hdc1000 = SDL_Pi_HDC1000.SDL_Pi_HDC1000()
hdc1000.turnHeaterOff()
hdc1000.setTemperatureResolution(SDL_Pi_HDC1000.HDC1000_CONFIG_TEMPERATURE_RESOLUTION_14BIT)
hdc1000.setHumidityResolution(SDL_Pi_HDC1000.HDC1000_CONFIG_HUMIDITY_RESOLUTION_14BIT)
else:
hdc1000 = None
#print 'MEAS_MODE:',ccs811.readMeasMode()
ccs811.configureSensor(configuration)
draw.rectangle((0, 0, width, height), outline=0, fill=0)
_,y = _print_on_screen_(f'MEAS_MODE:',0,0,timefont,0,198,120)
_,y = _print_on_screen_(f' {ccs811.readMeasMode()}',0,y,timefont,0,198,120)
_,y = _print_on_screen_(f'STATUS :',0,y,timefont,120,198,0)
_,y = _print_on_screen_(f' {bin(ccs811.readStatus())}',0,y,timefont,120,198,0)
time.sleep(2)
# Use these lines if you need to pre-set and check sensor baseline value
# if(INITIALBASELINE > 0):
# ccs811.setBaseline(INITIALBASELINE)
# print((ccs811.readBaseline()))
return ccs811, hdc1000
cnt = 0
result = {
"eCO2" : 0.,
'TVOC' : 0.,
'TEMP' : 0.,
"HUMD" : 0.
}
ccs811, hdc1000 = None, None
while True:
#Init sensor
if cnt == 0 :
del ccs811, hdc1000
ccs811, hdc1000 = init_sensor()
logging.info("Reset device")
time.sleep(1)
cnt = (cnt+1)%reinit
draw.rectangle((0, 0, width, height), outline=0, fill=0)
x,y = 0, 0
TIME = time.strftime("%H{}%M".format(":" if cnt%2 == 0 else " ") )
x0,y0 = _print_on_screen_(TIME,x,y+3,timefont,255,255,255)
DATE = time.strftime("%d/%m\n%Y")
_,_ = _print_on_screen_(DATE,x0+9,y+3,headfont,255,255,255)
draw.line((x0+3, y , x0+3 , y0+4),fill=(255,255,255),width=3)
draw.line((0 , y0+4, width, y0+4),fill=(255,255,255),width=3)
y = y0+8
#----CO2
CO2 = f"{result['eCO2']}"
if result['eCO2'] == "ERROR":
r,g,b = 1.,0.,0.
else:
r,g,b,_ = get_cmap('rainbow')(1. if result['eCO2'] > 1200 else result['eCO2']/1200)
x0,y0 = _print_on_screen_("eCO2 :\nppm",x,y,headfont,int(255*r),int(255*g),int(255*b))
_,y = _print_on_screen_(CO2,x0+1,y,timefont,int(255*r),int(255*g),int(255*b))
y += 6
#---TVOC
TVOC = f"{result['TVOC']}"
if result['TVOC'] == "ERROR":
r,g,b = 1.,0.,0.
else:
r,g,b,_ = get_cmap('rainbow')(1. if result['TVOC'] > 660 else result['TVOC']/660)
x0,y0 = _print_on_screen_("TVOC :\nppb",x,y,headfont,int(255*r),int(255*g),int(255*b))
_,y = _print_on_screen_(TVOC,x0+1,y,timefont,int(255*r),int(255*g),int(255*b))
y += 6
#---Temp
TEMP = "{:0.2f}".format(result['TEMP'])
if result['TVOC'] == "ERROR":
r,g,b = 1.,0.,0.
else:
r,g,b,_ = get_cmap('rainbow')(1. if result['TEMP'] > 50 else result['TEMP']/50)
x0,y0 = _print_on_screen_("TEMP :\nC",x,y,headfont,int(255*r),int(255*g),int(255*b))
_,y = _print_on_screen_(TEMP,x0+1,y,timefont,int(255*r),int(255*g),int(255*b))
y += 6
#---Humidity
HUMID = "{:0.2f}".format(result['HUMD'])
if result['TVOC'] == "ERROR":
r,g,b = 1.,0.,0.
else:
r,g,b,_ = get_cmap('rainbow')(1. if result['HUMD'] > 70 else result['HUMD']/70)
x0,y0 = _print_on_screen_("HUMID:\n%",x,y,headfont,int(255*r),int(255*g),int(255*b))
_,y = _print_on_screen_(HUMID,x0+1,y,timefont,int(255*r),int(255*g),int(255*b))
# Display image.
disp.image(image)
if cnt%pause != 0 : continue
if(HDC1080):
humidity = hdc1000.readHumidity()
temperature = hdc1000.readTemperature()
ccs811.setCompensation(temperature,humidity)
else:
humidity = 50.00
temperature = 25.00
try:
statusbyte = ccs811.readStatus()
logging.debug(f'STATUS: {bin(statusbyte)}')
except:
del ccs811
ccs811 = CCS811_RPi()
continue
try:
error = ccs811.checkError(statusbyte)
if(error):
logging.error(f'ERROR:{ccs811.checkError(statusbyte)}')
except:
del ccs811
ccs811 = CCS811_RPi()
continue
try:
if(not ccs811.checkDataReady(statusbyte)):
logging.info('No new samples are ready')
continue;
result = ccs811.readAlg();
if(not result):
result = {
"eCO2" : "ERROR",
'TVOC' : "ERROR",
'TEMP' : temperature,
"HUMD" : humidity
}
continue;
result["TEMP"] = temperature
result["HUMD"] = humidity
baseline = ccs811.readBaseline()
except:
del ccs811
ccs811 = CCS811_RPi()
continue
#Record data
if record:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
with open(record,"a") as fd:
fd.write(timestamp+f",{result['eCO2']},{result['TVOC']},{result['TEMP']},{result['HUMD']}"+"\n")
#
# CCS811_RPi class usage example
#
# Petr Lukas
# July, 11 2017
#
# Version 1.0
import time
import urllib.request, urllib.error, urllib.parse # comment this line if you don't need ThinkSpeak connection
import SDL_Pi_HDC1000 # comment this line if you don't use HDC sensor
from CCS811_RPi import CCS811_RPi
ccs811 = CCS811_RPi()
# Do you want to send data to thingSpeak? If yes set WRITE API KEY, otherwise set False
THINGSPEAK = False # or type 'YOURAPIKEY'
# Do you want to preset sensor baseline? If yes set the value here, otherwise set False
INITIALBASELINE = False
# Do you want to use integrated temperature meter to compensate temp/RH (CJMCU-8118 board)?
# If not pre-set sensor compensation temperature is 25 C and RH is 50 %
# You can compensate manually by method ccs811.setCompensation(temperature,humidity)
HDC1080 = False
'''
MEAS MODE REGISTER AND DRIVE MODE CONFIGURATION
0b0 Idle (Measurements are disabled in this mode)
0b10000 Constant power mode, IAQ measurement every second
0b100000 Pulse heating mode IAQ measurement every 10 seconds
0b110000 Low power pulse heating mode IAQ measurement every 60
0b1000000 Constant power mode, sensor measurement every 250ms
'''
# Set MEAS_MODE (measurement interval)
configuration = 0b100000
# Set read interval for retriveving last measurement data from the sensor
pause = 60
def thingSpeak(eCO2,TVOC,baseline,temperature,humidity):
print('Sending to ThingSpeak API...')
url = "https://api.thingspeak.com/update?api_key="
url += THINGSPEAK
url += "&field1="
url += str(eCO2)
url += "&field2="
url += str(TVOC)
url += "&field3="
url += str(baseline)
url += "&field4="
url += str(temperature)
url += "&field5="
url += str(humidity)
#print url
try:
content = urllib.request.urlopen(url).read()
except urllib.error.HTTPError:
print("Invalid HTTP response")
return False
print('Done')
#print content
print('Checking hardware ID...')
hwid = ccs811.checkHWID()
if(hwid == hex(129)):
print('Hardware ID is correct')
else: print('Incorrect hardware ID ',hwid, ', should be 0x81')
#print 'MEAS_MODE:',ccs811.readMeasMode()
ccs811.configureSensor(configuration)
print('MEAS_MODE:',ccs811.readMeasMode())
print('STATUS: ',bin(ccs811.readStatus()))
print('---------------------------------')
# Use these lines if you need to pre-set and check sensor baseline value
if(INITIALBASELINE > 0):
ccs811.setBaseline(INITIALBASELINE)
print((ccs811.readBaseline()))
# Use these lines if you use CJMCU-8118 which has HDC1080 temp/RH sensor
if(HDC1080):
hdc1000 = SDL_Pi_HDC1000.SDL_Pi_HDC1000()
hdc1000.turnHeaterOff()
hdc1000.setTemperatureResolution(SDL_Pi_HDC1000.HDC1000_CONFIG_TEMPERATURE_RESOLUTION_14BIT)
hdc1000.setHumidityResolution(SDL_Pi_HDC1000.HDC1000_CONFIG_HUMIDITY_RESOLUTION_14BIT)
while(1):
if(HDC1080):
humidity = hdc1000.readHumidity()
temperature = hdc1000.readTemperature()
ccs811.setCompensation(temperature,humidity)
else:
humidity = 50.00
temperature = 25.00
statusbyte = ccs811.readStatus()
print('STATUS: ', bin(statusbyte))
error = ccs811.checkError(statusbyte)
if(error):
print('ERROR:',ccs811.checkError(statusbyte))
if(not ccs811.checkDataReady(statusbyte)):
print('No new samples are ready')
print('---------------------------------');
time.sleep(pause)
continue;
result = ccs811.readAlg();
if(not result):
#print 'Invalid result received'
time.sleep(pause)
continue;
baseline = ccs811.readBaseline()
print('eCO2: ',result['eCO2'],' ppm')
print('TVOC: ',result['TVOC'], 'ppb')
print('Status register: ',bin(result['status']))
print('Last error ID: ',result['errorid'])
print('RAW data: ',result['raw'])
print('Baseline: ',baseline)
print('---------------------------------');
if (THINGSPEAK is not False):
thingSpeak(result['eCO2'],result['TVOC'],baseline,temperature,humidity)
time.sleep(pause)
#
# CCS811_RPi
#
# Petr Lukas
# July, 11 2017
#
# Version 1.0
import struct, array, time, io, fcntl, logging
# I2C Address
CCS811_ADDRESS = (0x5A)
# Registers
CCS811_HW_ID = (0x20)
CSS811_STATUS = (0x00)
CSS811_APP_START = (0xF4)
CSS811_MEAS_MODE = (0x01)
CSS811_ERROR_ID = (0xE0)
CSS811_RAW_DATA = (0x03)
CSS811_ALG_RESULT_DATA = (0x02)
CSS811_BASELINE = (0x11)
CSS811_ENV_DATA = (0x05)
# Errors ID
ERROR = {}
ERROR[0] = 'WRITE_REG_INVALID'
ERROR[1] = 'READ_REG_INVALID'
ERROR[2] = 'MEASMODE_INVALID'
ERROR[3] = 'MAX_RESISTANCE'
ERROR[4] = 'HEATER_FAULT'
ERROR[5] = 'HEATER_SUPPLY'
I2C_SLAVE=0x0703
#Max number of attempts to read/write I2C
MAXATT = 10
#CCS811_fw, CCS811_fr = 0,0
class CCS811_RPi:
def __init__(self, twi:int=1, addr:int=CCS811_ADDRESS, maxcnt:int=MAXATT ):
self.addr = addr
self.twi = twi
self.cnt = 0
self.maxcnt = maxcnt
self._init_connection_()
# helper functions
def _init_connection_(self):
self.CCS811_fr = io.open("/dev/i2c-"+str(self.twi), "rb", buffering=0)
self.CCS811_fw = io.open("/dev/i2c-"+str(self.twi), "wb", buffering=0)
# set device address
fcntl.ioctl(self.CCS811_fr, I2C_SLAVE, self.addr)
fcntl.ioctl(self.CCS811_fw, I2C_SLAVE, self.addr)
time.sleep(0.015)
self.cnt += 1
time.sleep(0.015)
logging.info("Init/Reinit descriptors")
def _write(self,s):
if self.cnt >= self.maxcnt:
logging.error("Exceed maximum number to attempt for reconnection")
raise RuntimeError("CCS811_RPi:Exceed maximum number to attempt for reconnection")
try:
self.CCS811_fw.write( s )
except:
logging.error("Fail to write to the sensor")
self._init_connection_()
self._write(s)
self.cnt = 0
def _read(self,datasize):
if self.cnt >= self.maxcnt:
logging.error("Exceed maximum number to attempt for reconnection")
raise RuntimeError("CCS811_RPi:Exceed maximum number to attempt for reconnection")
try:
data = self.CCS811_fr.read(datasize)
except:
logging.error("Fail to read the sensor")
self._init_connection_()
data = self._read(datasize)
self.cnt = 0
return data
# public functions
def checkHWID(self):
s = [CCS811_HW_ID] # Hardware ID
s2 = bytearray( s )
self._write( s2 )
time.sleep(0.0625)
data = self._read(1)
buf = array.array('B', data)
return hex(buf[0])
def readStatus(self):
time.sleep(0.015)
s = [CSS811_STATUS]
s2 = bytearray( s )
self._write( s2 )
time.sleep(0.0625)
data = self._read(1)
buf = array.array('B', data)
return buf[0]
def checkError(self,status_byte):
time.sleep(0.015)
error_bit = ((status_byte) >> 0) & 1
if(not error_bit):
return False
s = [CSS811_ERROR_ID]
s2 = bytearray( s )
self._write( s2 )
time.sleep(0.0625)
data = self._read(1)
buf = array.array('B', data)
return ERROR[int(buf[0])]
def configureSensor(self, configuration):
# Switch sensor to application mode
s = [CSS811_APP_START]
s2 = bytearray( s )
self._write( s2 )
time.sleep(0.0625)
s = [CSS811_MEAS_MODE,configuration,0x00]
s2 = bytearray( s )
self._write( s2 )
time.sleep(0.015)
return
def readMeasMode(self):
time.sleep(0.015)
s = [CSS811_MEAS_MODE]
s2 = bytearray( s )
self._write( s2 )
time.sleep(0.0625)
data = self._read(1)
buf = array.array('B', data)
return bin(buf[0])
def readRaw(self):
time.sleep(0.015)
s = [CSS811_RAW_DATA]
s2 = bytearray( s )
self._write( s2 )
time.sleep(0.0625)
data = self._read(2)
buf = array.array('B', data)
return (buf[0]*256 + buf[1])
def readAlg(self):
time.sleep(0.015)
s = [CSS811_ALG_RESULT_DATA]
s2 = bytearray( s )
self._write( s2 )
time.sleep(0.0625)
data = self._read(8)
buf = array.array('B', data)
result = {}
# Read eCO2 value and check if it is valid
result['eCO2'] = buf[0]*256 + buf[1]
if(result['eCO2'] > 8192):
logging.error('Invalid eCO2 value')
#return False
result['eCO2'] = "ERROR"
# Read TVOC value and check if it is valid
result['TVOC'] = buf[2]*256 + buf[3]
if(result['TVOC'] > 1187):
logging.error('Invalid TVOC value')
#return False
result['TVOC'] = "ERROR"
result['status'] = buf[4]
# Read the last error ID and check if it is valid
result['errorid'] = buf[5]
if(result['errorid'] > 5):
logging.error('Invalid Error ID')
return False
result['raw'] = buf[6]*256 + buf[7]
return result
def readBaseline(self):
time.sleep(0.015)
s = [CSS811_BASELINE]
s2 = bytearray( s )
self._write( s2 )
time.sleep(0.0625)
data = self._read(2)
buf = array.array('B', data)
return (buf[0]*256 + buf[1])
def checkDataReady(self, status_byte):
ready_bit = ((status_byte) >> 3) & 1
if(ready_bit):
return True
else: return False
def setCompensation(self, temperature, humidity):
temperature = round(temperature,2)
humidity = round(humidity,2)
logging.debug(f'Setting compensation to {temperature}C and {humidity}%')
hum1 = int(humidity//0.5)
hum2 = int(humidity*512-hum1*256)
temperature = temperature+25
temp1 = int(temperature//0.5)
temp2 = int(temperature*512-temp1*256)
s = [CSS811_ENV_DATA,hum1,hum2,temp1,temp2,0x00]
s2 = bytearray( s )
self._write( s2 )
return
def setBaseline(self, baseline):
loggin.info(f'Setting baseline to {baseline}')
buf = [0,0]
s = struct.pack('>H', baseline)
buf[0], buf[1] = struct.unpack('>BB', s)
loggin.info(f'{buf[0]}')
loggin.info(f'{buf[1]}')
s = [CSS811_BASELINE,buf[0],buf[1],0x00]
s2 = bytearray( s )
self._write( s2 )
time.sleep(0.015)
#
# SDL_Pi_HDC1000
# Raspberry Pi Driver for the SwitchDoc Labs HDC1000 Breakout Board
#
# SwitchDoc Labs
# January 2017
#
# Version 1.1
#constants
# I2C Address
HDC1000_ADDRESS = (0x40) # 1000000
# Registers
HDC1000_TEMPERATURE_REGISTER = (0x00)
HDC1000_HUMIDITY_REGISTER = (0x01)
HDC1000_CONFIGURATION_REGISTER = (0x02)
HDC1000_MANUFACTURERID_REGISTER = (0xFE)
HDC1000_DEVICEID_REGISTER = (0xFF)
HDC1000_SERIALIDHIGH_REGISTER = (0xFB)
HDC1000_SERIALIDMID_REGISTER = (0xFC)
HDC1000_SERIALIDBOTTOM_REGISTER = (0xFD)
#Configuration Register Bits
HDC1000_CONFIG_RESET_BIT = (0x8000)
HDC1000_CONFIG_HEATER_ENABLE = (0x2000)
HDC1000_CONFIG_ACQUISITION_MODE = (0x1000)
HDC1000_CONFIG_BATTERY_STATUS = (0x0800)
HDC1000_CONFIG_TEMPERATURE_RESOLUTION = (0x0400)
HDC1000_CONFIG_HUMIDITY_RESOLUTION_HBIT = (0x0200)
HDC1000_CONFIG_HUMIDITY_RESOLUTION_LBIT = (0x0100)
HDC1000_CONFIG_TEMPERATURE_RESOLUTION_14BIT = (0x0000)
HDC1000_CONFIG_TEMPERATURE_RESOLUTION_11BIT = (0x0400)
HDC1000_CONFIG_HUMIDITY_RESOLUTION_14BIT = (0x0000)
HDC1000_CONFIG_HUMIDITY_RESOLUTION_11BIT = (0x0100)
HDC1000_CONFIG_HUMIDITY_RESOLUTION_8BIT = (0x0200)
I2C_SLAVE=0x0703
import struct, array, time, io, fcntl
HDC1000_fw= 0
HDC1000_fr= 0
class SDL_Pi_HDC1000:
def __init__(self, twi=1, addr=HDC1000_ADDRESS ):
global HDC1000_fr, HDC1000_fw
HDC1000_fr= io.open("/dev/i2c-"+str(twi), "rb", buffering=0)
HDC1000_fw= io.open("/dev/i2c-"+str(twi), "wb", buffering=0)
# set device address
fcntl.ioctl(HDC1000_fr, I2C_SLAVE, HDC1000_ADDRESS)
fcntl.ioctl(HDC1000_fw, I2C_SLAVE, HDC1000_ADDRESS)
time.sleep(0.015) #15ms startup time
config = HDC1000_CONFIG_ACQUISITION_MODE
s = [HDC1000_CONFIGURATION_REGISTER,config>>8,0x00]
s2 = bytearray( s )
HDC1000_fw.write( s2 ) #sending config register bytes
time.sleep(0.015) # From the data sheet
# 0x10(48) Temperature, Humidity enabled, Resolultion = 14-bits, Heater off
#config = HDC1000_CONFIG_ACQUISITION_MODE
#self._bus.write_byte_data(HDC1000_ADDRESS,HDC1000_CONFIGURATION_REGISTER, config>>8)
# public functions
def readTemperature(self):
s = [HDC1000_TEMPERATURE_REGISTER] # temp
s2 = bytearray( s )
HDC1000_fw.write( s2 )
time.sleep(0.0625) # From the data sheet
data = HDC1000_fr.read(2) #read 2 byte temperature data
buf = array.array('B', data)
#print ( "Temp: %f 0x%X %X" % ( ((((buf[0]<<8) + (buf[1]))/65536.0)*165.0 ) - 40.0 ,buf[0],buf[1] ) )
# Convert the data
temp = (buf[0] * 256) + buf[1]
cTemp = (temp / 65536.0) * 165.0 - 40
return cTemp
def readHumidity(self):
# Send humidity measurement command, 0x01(01)
time.sleep(0.015) # From the data sheet
s = [HDC1000_HUMIDITY_REGISTER] # hum
s2 = bytearray( s )
HDC1000_fw.write( s2 )
time.sleep(0.0625) # From the data sheet
data = HDC1000_fr.read(2) #read 2 byte humidity data
buf = array.array('B', data)
#print ( "Humidity: %f 0x%X %X " % ( ((((buf[0]<<8) + (buf[1]))/65536.0)*100.0 ), buf[0], buf[1] ) )
humidity = (buf[0] * 256) + buf[1]
humidity = (humidity / 65536.0) * 100.0
return humidity
def readConfigRegister(self):
# Read config register
s = [HDC1000_CONFIGURATION_REGISTER] # temp
s2 = bytearray( s )
HDC1000_fw.write( s2 )
time.sleep(0.0625) # From the data sheet
data = HDC1000_fr.read(2) #read 2 byte config data
buf = array.array('B', data)
#print "register=%X %X"% (buf[0], buf[1])
return buf[0]*256+buf[1]
def turnHeaterOn(self):
# Read config register
config = self.readConfigRegister()
config = config | HDC1000_CONFIG_HEATER_ENABLE
s = [HDC1000_CONFIGURATION_REGISTER,config>>8,0x00]
s2 = bytearray( s )
HDC1000_fw.write( s2 ) #sending config register bytes
time.sleep(0.015) # From the data sheet
return
def turnHeaterOff(self):
# Read config register
config = self.readConfigRegister()
config = config & ~HDC1000_CONFIG_HEATER_ENABLE
s = [HDC1000_CONFIGURATION_REGISTER,config>>8,0x00]
s2 = bytearray( s )
HDC1000_fw.write( s2 ) #sending config register bytes
time.sleep(0.015) # From the data sheet
return
def setHumidityResolution(self,resolution):
# Read config register
config = self.readConfigRegister()
config = (config & ~0x0300) | resolution
s = [HDC1000_CONFIGURATION_REGISTER,config>>8,0x00]
s2 = bytearray( s )
HDC1000_fw.write( s2 ) #sending config register bytes
time.sleep(0.015) # From the data sheet
return
def setTemperatureResolution(self,resolution):
# Read config register
config = self.readConfigRegister()
config = (config & ~0x0400) | resolution
s = [HDC1000_CONFIGURATION_REGISTER,config>>8,0x00]
s2 = bytearray( s )
HDC1000_fw.write( s2 ) #sending config register bytes
time.sleep(0.015) # From the data sheet
return
def readBatteryStatus(self):
# Read config register
config = self.readConfigRegister()
config = config & ~ HDC1000_CONFIG_HEATER_ENABLE
if (config == 0):
return True
else:
return False
return 0
def readManufacturerID(self):
s = [HDC1000_MANUFACTURERID_REGISTER] # temp
s2 = bytearray( s )
HDC1000_fw.write( s2 )
time.sleep(0.0625) # From the data sheet
data = HDC1000_fr.read(2) #read 2 byte config data
buf = array.array('B', data)
return buf[0] * 256 + buf[1]
def readDeviceID(self):
s = [HDC1000_DEVICEID_REGISTER] # temp
s2 = bytearray( s )
HDC1000_fw.write( s2 )
time.sleep(0.0625) # From the data sheet
data = HDC1000_fr.read(2) #read 2 byte config data
buf = array.array('B', data)
return buf[0] * 256 + buf[1]
def readSerialNumber(self):
serialNumber = 0
s = [HDC1000_SERIALIDHIGH_REGISTER] # temp
s2 = bytearray( s )
HDC1000_fw.write( s2 )
time.sleep(0.0625) # From the data sheet
data = HDC1000_fr.read(2) #read 2 byte config data
buf = array.array('B', data)
serialNumber = buf[0]*256+ buf[1]
s = [HDC1000_SERIALIDMID_REGISTER] # temp
s2 = bytearray( s )
HDC1000_fw.write( s2 )
time.sleep(0.0625) # From the data sheet
data = HDC1000_fr.read(2) #read 2 byte config data
buf = array.array('B', data)
serialNumber = serialNumber*256 + buf[0]*256 + buf[1]
s = [HDC1000_SERIALIDBOTTOM_REGISTER] # temp
s2 = bytearray( s )
HDC1000_fw.write( s2 )
time.sleep(0.0625) # From the data sheet
data = HDC1000_fr.read(2) #read 2 byte config data
buf = array.array('B', data)
serialNumber = serialNumber*256 + buf[0]*256 + buf[1]
return serialNumber
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment