Updated version of the library for Python 3 See an example of airpollution monitor in airmonitor.py
-
-
Save rat-h/150ffa2877efed51df03b5de6e8c97e1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A simple air pollution monitor. | |
I used ST7735 TFT display and Adafruit CircuitPython RGB library | |
as described [here](https://learn.adafruit.com/1-8-tft-display/python-wiring-and-setup) | |
It prints out time and data on the screen and eCO2 and other measurements. | |
Colors of text vary according to the measurement. | |
Note that by some reason R and B colors were switched on my display. | |
So go ahead and correct r and b according to your hardware in hand. | |
Also, my sensor pretty often shows impossible numbers for eCO2 (I would be dead already). | |
To minimize these errors, I add re-initiation every hour. I hope it may be just a bad sensor. | |
If you find a way to make this system more robust without re-initiation, let me know. | |
Video is [here] https://youtu.be/4aXMTt8Ia9Q | |
-RTH | |
""" | |
import time, logging | |
import subprocess | |
import digitalio | |
import board | |
from PIL import Image, ImageDraw, ImageFont | |
import matplotlib | |
matplotlib.use('Agg') | |
from matplotlib.pyplot import get_cmap | |
import adafruit_rgb_display.st7735 as st7735 # pylint: disable=unused-import | |
from CCS811_RPi import CCS811_RPi | |
import SDL_Pi_HDC1000 | |
logging.basicConfig(format='%(asctime)s:%(lineno)-6d%(levelname)-8s:%(message)s', level=logging.INFO,filename="airpol.log") | |
HDC1080 = True | |
configuration = 0b100000 | |
pause = 10 | |
reinit = 3600 | |
record = "rec.cvs" | |
# Configuration for CS and DC pins (these are PiTFT defaults): | |
cs_pin = digitalio.DigitalInOut(board.CE0) | |
dc_pin = digitalio.DigitalInOut(board.D25) | |
reset_pin = digitalio.DigitalInOut(board.D24) | |
# Config for display baudrate (default max is 24mhz): | |
BAUDRATE = 24000000 | |
# Setup SPI bus using hardware SPI: | |
spi = board.SPI() | |
# pylint: disable=line-too-long | |
# Create the display: | |
disp = st7735.ST7735R( | |
spi, | |
rotation=0, # 2.2", 2.4", 2.8", 3.2" ILI9341 | |
height=160, | |
width=128, | |
cs=cs_pin, | |
dc=dc_pin, | |
rst=reset_pin, | |
baudrate=BAUDRATE, | |
) | |
if disp.rotation % 180 == 90: | |
height = disp.width | |
width = disp.height | |
else: | |
width = disp.width | |
height = disp.height | |
TIME_FONTSIZE = 24 | |
MESG_FONTSIZE = 16 | |
HEAD_FONTSIZE = 10 | |
image = Image.new("RGB", (width, height)) | |
# Get drawing object to draw on image. | |
draw = ImageDraw.Draw(image) | |
# Draw a black filled box to clear the image. | |
draw.rectangle((0, 0, width, height), outline=0, fill=(0, 0, 0)) | |
disp.image(image) | |
# Load a TTF font. Make sure the .ttf font file is in the | |
# same directory as the python script! | |
# Some other nice fonts to try: http://www.dafont.com/bitmap.php | |
timefont = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", TIME_FONTSIZE) | |
mesgfont = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", MESG_FONTSIZE) | |
headfont = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", HEAD_FONTSIZE) | |
draw.rectangle((20, 20, width-20, height-20), outline=0, fill=(0, 0, 255)) | |
disp.image(image) | |
def _print_on_screen_(text,x,y,font, r,g,b): | |
draw.text((x, y), text, font=font, fill=(b,g,r)) | |
dx,dy = font.getsize(text) | |
return x+dx+2,y+dy+2 | |
def init_sensor(): | |
ccs811 = CCS811_RPi() | |
hwid = ccs811.checkHWID() | |
if(hwid == hex(129)): | |
draw.rectangle((20, 20, width-20, height-20), outline=0, fill=(0, 255, 0)) | |
disp.image(image) | |
else: | |
_print_on_screen_("HARDWARE\nERROR",0,0,timefont,255,0,0) | |
disp.image(image) | |
exit(1) | |
if(HDC1080): | |
hdc1000 = SDL_Pi_HDC1000.SDL_Pi_HDC1000() | |
hdc1000.turnHeaterOff() | |
hdc1000.setTemperatureResolution(SDL_Pi_HDC1000.HDC1000_CONFIG_TEMPERATURE_RESOLUTION_14BIT) | |
hdc1000.setHumidityResolution(SDL_Pi_HDC1000.HDC1000_CONFIG_HUMIDITY_RESOLUTION_14BIT) | |
else: | |
hdc1000 = None | |
#print 'MEAS_MODE:',ccs811.readMeasMode() | |
ccs811.configureSensor(configuration) | |
draw.rectangle((0, 0, width, height), outline=0, fill=0) | |
_,y = _print_on_screen_(f'MEAS_MODE:',0,0,timefont,0,198,120) | |
_,y = _print_on_screen_(f' {ccs811.readMeasMode()}',0,y,timefont,0,198,120) | |
_,y = _print_on_screen_(f'STATUS :',0,y,timefont,120,198,0) | |
_,y = _print_on_screen_(f' {bin(ccs811.readStatus())}',0,y,timefont,120,198,0) | |
time.sleep(2) | |
# Use these lines if you need to pre-set and check sensor baseline value | |
# if(INITIALBASELINE > 0): | |
# ccs811.setBaseline(INITIALBASELINE) | |
# print((ccs811.readBaseline())) | |
return ccs811, hdc1000 | |
cnt = 0 | |
result = { | |
"eCO2" : 0., | |
'TVOC' : 0., | |
'TEMP' : 0., | |
"HUMD" : 0. | |
} | |
ccs811, hdc1000 = None, None | |
while True: | |
#Init sensor | |
if cnt == 0 : | |
del ccs811, hdc1000 | |
ccs811, hdc1000 = init_sensor() | |
logging.info("Reset device") | |
time.sleep(1) | |
cnt = (cnt+1)%reinit | |
draw.rectangle((0, 0, width, height), outline=0, fill=0) | |
x,y = 0, 0 | |
TIME = time.strftime("%H{}%M".format(":" if cnt%2 == 0 else " ") ) | |
x0,y0 = _print_on_screen_(TIME,x,y+3,timefont,255,255,255) | |
DATE = time.strftime("%d/%m\n%Y") | |
_,_ = _print_on_screen_(DATE,x0+9,y+3,headfont,255,255,255) | |
draw.line((x0+3, y , x0+3 , y0+4),fill=(255,255,255),width=3) | |
draw.line((0 , y0+4, width, y0+4),fill=(255,255,255),width=3) | |
y = y0+8 | |
#----CO2 | |
CO2 = f"{result['eCO2']}" | |
if result['eCO2'] == "ERROR": | |
r,g,b = 1.,0.,0. | |
else: | |
r,g,b,_ = get_cmap('rainbow')(1. if result['eCO2'] > 1200 else result['eCO2']/1200) | |
x0,y0 = _print_on_screen_("eCO2 :\nppm",x,y,headfont,int(255*r),int(255*g),int(255*b)) | |
_,y = _print_on_screen_(CO2,x0+1,y,timefont,int(255*r),int(255*g),int(255*b)) | |
y += 6 | |
#---TVOC | |
TVOC = f"{result['TVOC']}" | |
if result['TVOC'] == "ERROR": | |
r,g,b = 1.,0.,0. | |
else: | |
r,g,b,_ = get_cmap('rainbow')(1. if result['TVOC'] > 660 else result['TVOC']/660) | |
x0,y0 = _print_on_screen_("TVOC :\nppb",x,y,headfont,int(255*r),int(255*g),int(255*b)) | |
_,y = _print_on_screen_(TVOC,x0+1,y,timefont,int(255*r),int(255*g),int(255*b)) | |
y += 6 | |
#---Temp | |
TEMP = "{:0.2f}".format(result['TEMP']) | |
if result['TVOC'] == "ERROR": | |
r,g,b = 1.,0.,0. | |
else: | |
r,g,b,_ = get_cmap('rainbow')(1. if result['TEMP'] > 50 else result['TEMP']/50) | |
x0,y0 = _print_on_screen_("TEMP :\nC",x,y,headfont,int(255*r),int(255*g),int(255*b)) | |
_,y = _print_on_screen_(TEMP,x0+1,y,timefont,int(255*r),int(255*g),int(255*b)) | |
y += 6 | |
#---Humidity | |
HUMID = "{:0.2f}".format(result['HUMD']) | |
if result['TVOC'] == "ERROR": | |
r,g,b = 1.,0.,0. | |
else: | |
r,g,b,_ = get_cmap('rainbow')(1. if result['HUMD'] > 70 else result['HUMD']/70) | |
x0,y0 = _print_on_screen_("HUMID:\n%",x,y,headfont,int(255*r),int(255*g),int(255*b)) | |
_,y = _print_on_screen_(HUMID,x0+1,y,timefont,int(255*r),int(255*g),int(255*b)) | |
# Display image. | |
disp.image(image) | |
if cnt%pause != 0 : continue | |
if(HDC1080): | |
humidity = hdc1000.readHumidity() | |
temperature = hdc1000.readTemperature() | |
ccs811.setCompensation(temperature,humidity) | |
else: | |
humidity = 50.00 | |
temperature = 25.00 | |
try: | |
statusbyte = ccs811.readStatus() | |
logging.debug(f'STATUS: {bin(statusbyte)}') | |
except: | |
del ccs811 | |
ccs811 = CCS811_RPi() | |
continue | |
try: | |
error = ccs811.checkError(statusbyte) | |
if(error): | |
logging.error(f'ERROR:{ccs811.checkError(statusbyte)}') | |
except: | |
del ccs811 | |
ccs811 = CCS811_RPi() | |
continue | |
try: | |
if(not ccs811.checkDataReady(statusbyte)): | |
logging.info('No new samples are ready') | |
continue; | |
result = ccs811.readAlg(); | |
if(not result): | |
result = { | |
"eCO2" : "ERROR", | |
'TVOC' : "ERROR", | |
'TEMP' : temperature, | |
"HUMD" : humidity | |
} | |
continue; | |
result["TEMP"] = temperature | |
result["HUMD"] = humidity | |
baseline = ccs811.readBaseline() | |
except: | |
del ccs811 | |
ccs811 = CCS811_RPi() | |
continue | |
#Record data | |
if record: | |
timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
with open(record,"a") as fd: | |
fd.write(timestamp+f",{result['eCO2']},{result['TVOC']},{result['TEMP']},{result['HUMD']}"+"\n") | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A simple air pollution monitor. | |
I used ST7735 TFT display and Adafruit CircuitPython RGB library | |
as described [here](https://learn.adafruit.com/1-8-tft-display/python-wiring-and-setup) | |
It prints out time and data on the screen and eCO2 and other measurements. | |
Colors of text vary according to the measurement. | |
Note that by some reason R and B colors were switched on my display. | |
So go ahead and correct r and b according to your hardware in hand. | |
Also, my sensor pretty often shows impossible numbers for eCO2 (I would be dead already). | |
To minimize these errors, I add re-initiation every hour. I hope it may be just a bad sensor. | |
If you find a way to make this system more robust without re-initiation, let me know. | |
Video is [here] https://youtu.be/4aXMTt8Ia9Q | |
-RTH | |
""" | |
import time, logging | |
import subprocess | |
import digitalio | |
import board | |
from PIL import Image, ImageDraw, ImageFont | |
import matplotlib | |
matplotlib.use('Agg') | |
from matplotlib.pyplot import get_cmap | |
import adafruit_rgb_display.st7735 as st7735 # pylint: disable=unused-import | |
from CCS811_RPi import CCS811_RPi | |
import SDL_Pi_HDC1000 | |
logging.basicConfig(format='%(asctime)s:%(lineno)-6d%(levelname)-8s:%(message)s', level=logging.INFO,filename="airpol.log") | |
HDC1080 = True | |
configuration = 0b100000 | |
pause = 10 | |
reinit = 3600 | |
record = "rec.cvs" | |
# Configuration for CS and DC pins (these are PiTFT defaults): | |
cs_pin = digitalio.DigitalInOut(board.CE0) | |
dc_pin = digitalio.DigitalInOut(board.D25) | |
reset_pin = digitalio.DigitalInOut(board.D24) | |
# Config for display baudrate (default max is 24mhz): | |
BAUDRATE = 24000000 | |
# Setup SPI bus using hardware SPI: | |
spi = board.SPI() | |
# pylint: disable=line-too-long | |
# Create the display: | |
disp = st7735.ST7735R( | |
spi, | |
rotation=0, # 2.2", 2.4", 2.8", 3.2" ILI9341 | |
height=160, | |
width=128, | |
cs=cs_pin, | |
dc=dc_pin, | |
rst=reset_pin, | |
baudrate=BAUDRATE, | |
) | |
if disp.rotation % 180 == 90: | |
height = disp.width | |
width = disp.height | |
else: | |
width = disp.width | |
height = disp.height | |
TIME_FONTSIZE = 24 | |
MESG_FONTSIZE = 16 | |
HEAD_FONTSIZE = 10 | |
image = Image.new("RGB", (width, height)) | |
# Get drawing object to draw on image. | |
draw = ImageDraw.Draw(image) | |
# Draw a black filled box to clear the image. | |
draw.rectangle((0, 0, width, height), outline=0, fill=(0, 0, 0)) | |
disp.image(image) | |
# Load a TTF font. Make sure the .ttf font file is in the | |
# same directory as the python script! | |
# Some other nice fonts to try: http://www.dafont.com/bitmap.php | |
timefont = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", TIME_FONTSIZE) | |
mesgfont = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", MESG_FONTSIZE) | |
headfont = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", HEAD_FONTSIZE) | |
draw.rectangle((20, 20, width-20, height-20), outline=0, fill=(0, 0, 255)) | |
disp.image(image) | |
def _print_on_screen_(text,x,y,font, r,g,b): | |
draw.text((x, y), text, font=font, fill=(b,g,r)) | |
dx,dy = font.getsize(text) | |
return x+dx+2,y+dy+2 | |
def init_sensor(): | |
ccs811 = CCS811_RPi() | |
hwid = ccs811.checkHWID() | |
if(hwid == hex(129)): | |
draw.rectangle((20, 20, width-20, height-20), outline=0, fill=(0, 255, 0)) | |
disp.image(image) | |
else: | |
_print_on_screen_("HARDWARE\nERROR",0,0,timefont,255,0,0) | |
disp.image(image) | |
exit(1) | |
if(HDC1080): | |
hdc1000 = SDL_Pi_HDC1000.SDL_Pi_HDC1000() | |
hdc1000.turnHeaterOff() | |
hdc1000.setTemperatureResolution(SDL_Pi_HDC1000.HDC1000_CONFIG_TEMPERATURE_RESOLUTION_14BIT) | |
hdc1000.setHumidityResolution(SDL_Pi_HDC1000.HDC1000_CONFIG_HUMIDITY_RESOLUTION_14BIT) | |
else: | |
hdc1000 = None | |
#print 'MEAS_MODE:',ccs811.readMeasMode() | |
ccs811.configureSensor(configuration) | |
draw.rectangle((0, 0, width, height), outline=0, fill=0) | |
_,y = _print_on_screen_(f'MEAS_MODE:',0,0,timefont,0,198,120) | |
_,y = _print_on_screen_(f' {ccs811.readMeasMode()}',0,y,timefont,0,198,120) | |
_,y = _print_on_screen_(f'STATUS :',0,y,timefont,120,198,0) | |
_,y = _print_on_screen_(f' {bin(ccs811.readStatus())}',0,y,timefont,120,198,0) | |
time.sleep(2) | |
# Use these lines if you need to pre-set and check sensor baseline value | |
# if(INITIALBASELINE > 0): | |
# ccs811.setBaseline(INITIALBASELINE) | |
# print((ccs811.readBaseline())) | |
return ccs811, hdc1000 | |
cnt = 0 | |
result = { | |
"eCO2" : 0., | |
'TVOC' : 0., | |
'TEMP' : 0., | |
"HUMD" : 0. | |
} | |
ccs811, hdc1000 = None, None | |
while True: | |
#Init sensor | |
if cnt == 0 : | |
del ccs811, hdc1000 | |
ccs811, hdc1000 = init_sensor() | |
logging.info("Reset device") | |
time.sleep(1) | |
cnt = (cnt+1)%reinit | |
draw.rectangle((0, 0, width, height), outline=0, fill=0) | |
x,y = 0, 0 | |
TIME = time.strftime("%H{}%M".format(":" if cnt%2 == 0 else " ") ) | |
x0,y0 = _print_on_screen_(TIME,x,y+3,timefont,255,255,255) | |
DATE = time.strftime("%d/%m\n%Y") | |
_,_ = _print_on_screen_(DATE,x0+9,y+3,headfont,255,255,255) | |
draw.line((x0+3, y , x0+3 , y0+4),fill=(255,255,255),width=3) | |
draw.line((0 , y0+4, width, y0+4),fill=(255,255,255),width=3) | |
y = y0+8 | |
#----CO2 | |
CO2 = f"{result['eCO2']}" | |
if result['eCO2'] == "ERROR": | |
r,g,b = 1.,0.,0. | |
else: | |
r,g,b,_ = get_cmap('rainbow')(1. if result['eCO2'] > 1200 else result['eCO2']/1200) | |
x0,y0 = _print_on_screen_("eCO2 :\nppm",x,y,headfont,int(255*r),int(255*g),int(255*b)) | |
_,y = _print_on_screen_(CO2,x0+1,y,timefont,int(255*r),int(255*g),int(255*b)) | |
y += 6 | |
#---TVOC | |
TVOC = f"{result['TVOC']}" | |
if result['TVOC'] == "ERROR": | |
r,g,b = 1.,0.,0. | |
else: | |
r,g,b,_ = get_cmap('rainbow')(1. if result['TVOC'] > 660 else result['TVOC']/660) | |
x0,y0 = _print_on_screen_("TVOC :\nppb",x,y,headfont,int(255*r),int(255*g),int(255*b)) | |
_,y = _print_on_screen_(TVOC,x0+1,y,timefont,int(255*r),int(255*g),int(255*b)) | |
y += 6 | |
#---Temp | |
TEMP = "{:0.2f}".format(result['TEMP']) | |
if result['TVOC'] == "ERROR": | |
r,g,b = 1.,0.,0. | |
else: | |
r,g,b,_ = get_cmap('rainbow')(1. if result['TEMP'] > 50 else result['TEMP']/50) | |
x0,y0 = _print_on_screen_("TEMP :\nC",x,y,headfont,int(255*r),int(255*g),int(255*b)) | |
_,y = _print_on_screen_(TEMP,x0+1,y,timefont,int(255*r),int(255*g),int(255*b)) | |
y += 6 | |
#---Humidity | |
HUMID = "{:0.2f}".format(result['HUMD']) | |
if result['TVOC'] == "ERROR": | |
r,g,b = 1.,0.,0. | |
else: | |
r,g,b,_ = get_cmap('rainbow')(1. if result['HUMD'] > 70 else result['HUMD']/70) | |
x0,y0 = _print_on_screen_("HUMID:\n%",x,y,headfont,int(255*r),int(255*g),int(255*b)) | |
_,y = _print_on_screen_(HUMID,x0+1,y,timefont,int(255*r),int(255*g),int(255*b)) | |
# Display image. | |
disp.image(image) | |
if cnt%pause != 0 : continue | |
if(HDC1080): | |
humidity = hdc1000.readHumidity() | |
temperature = hdc1000.readTemperature() | |
ccs811.setCompensation(temperature,humidity) | |
else: | |
humidity = 50.00 | |
temperature = 25.00 | |
try: | |
statusbyte = ccs811.readStatus() | |
logging.debug(f'STATUS: {bin(statusbyte)}') | |
except: | |
del ccs811 | |
ccs811 = CCS811_RPi() | |
continue | |
try: | |
error = ccs811.checkError(statusbyte) | |
if(error): | |
logging.error(f'ERROR:{ccs811.checkError(statusbyte)}') | |
except: | |
del ccs811 | |
ccs811 = CCS811_RPi() | |
continue | |
try: | |
if(not ccs811.checkDataReady(statusbyte)): | |
logging.info('No new samples are ready') | |
continue; | |
result = ccs811.readAlg(); | |
if(not result): | |
result = { | |
"eCO2" : "ERROR", | |
'TVOC' : "ERROR", | |
'TEMP' : temperature, | |
"HUMD" : humidity | |
} | |
continue; | |
result["TEMP"] = temperature | |
result["HUMD"] = humidity | |
baseline = ccs811.readBaseline() | |
except: | |
del ccs811 | |
ccs811 = CCS811_RPi() | |
continue | |
#Record data | |
if record: | |
timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
with open(record,"a") as fd: | |
fd.write(timestamp+f",{result['eCO2']},{result['TVOC']},{result['TEMP']},{result['HUMD']}"+"\n") | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# CCS811_RPi class usage example | |
# | |
# Petr Lukas | |
# July, 11 2017 | |
# | |
# Version 1.0 | |
import time | |
import urllib.request, urllib.error, urllib.parse # comment this line if you don't need ThinkSpeak connection | |
import SDL_Pi_HDC1000 # comment this line if you don't use HDC sensor | |
from CCS811_RPi import CCS811_RPi | |
ccs811 = CCS811_RPi() | |
# Do you want to send data to thingSpeak? If yes set WRITE API KEY, otherwise set False | |
THINGSPEAK = False # or type 'YOURAPIKEY' | |
# Do you want to preset sensor baseline? If yes set the value here, otherwise set False | |
INITIALBASELINE = False | |
# Do you want to use integrated temperature meter to compensate temp/RH (CJMCU-8118 board)? | |
# If not pre-set sensor compensation temperature is 25 C and RH is 50 % | |
# You can compensate manually by method ccs811.setCompensation(temperature,humidity) | |
HDC1080 = False | |
''' | |
MEAS MODE REGISTER AND DRIVE MODE CONFIGURATION | |
0b0 Idle (Measurements are disabled in this mode) | |
0b10000 Constant power mode, IAQ measurement every second | |
0b100000 Pulse heating mode IAQ measurement every 10 seconds | |
0b110000 Low power pulse heating mode IAQ measurement every 60 | |
0b1000000 Constant power mode, sensor measurement every 250ms | |
''' | |
# Set MEAS_MODE (measurement interval) | |
configuration = 0b100000 | |
# Set read interval for retriveving last measurement data from the sensor | |
pause = 60 | |
def thingSpeak(eCO2,TVOC,baseline,temperature,humidity): | |
print('Sending to ThingSpeak API...') | |
url = "https://api.thingspeak.com/update?api_key=" | |
url += THINGSPEAK | |
url += "&field1=" | |
url += str(eCO2) | |
url += "&field2=" | |
url += str(TVOC) | |
url += "&field3=" | |
url += str(baseline) | |
url += "&field4=" | |
url += str(temperature) | |
url += "&field5=" | |
url += str(humidity) | |
#print url | |
try: | |
content = urllib.request.urlopen(url).read() | |
except urllib.error.HTTPError: | |
print("Invalid HTTP response") | |
return False | |
print('Done') | |
#print content | |
print('Checking hardware ID...') | |
hwid = ccs811.checkHWID() | |
if(hwid == hex(129)): | |
print('Hardware ID is correct') | |
else: print('Incorrect hardware ID ',hwid, ', should be 0x81') | |
#print 'MEAS_MODE:',ccs811.readMeasMode() | |
ccs811.configureSensor(configuration) | |
print('MEAS_MODE:',ccs811.readMeasMode()) | |
print('STATUS: ',bin(ccs811.readStatus())) | |
print('---------------------------------') | |
# Use these lines if you need to pre-set and check sensor baseline value | |
if(INITIALBASELINE > 0): | |
ccs811.setBaseline(INITIALBASELINE) | |
print((ccs811.readBaseline())) | |
# Use these lines if you use CJMCU-8118 which has HDC1080 temp/RH sensor | |
if(HDC1080): | |
hdc1000 = SDL_Pi_HDC1000.SDL_Pi_HDC1000() | |
hdc1000.turnHeaterOff() | |
hdc1000.setTemperatureResolution(SDL_Pi_HDC1000.HDC1000_CONFIG_TEMPERATURE_RESOLUTION_14BIT) | |
hdc1000.setHumidityResolution(SDL_Pi_HDC1000.HDC1000_CONFIG_HUMIDITY_RESOLUTION_14BIT) | |
while(1): | |
if(HDC1080): | |
humidity = hdc1000.readHumidity() | |
temperature = hdc1000.readTemperature() | |
ccs811.setCompensation(temperature,humidity) | |
else: | |
humidity = 50.00 | |
temperature = 25.00 | |
statusbyte = ccs811.readStatus() | |
print('STATUS: ', bin(statusbyte)) | |
error = ccs811.checkError(statusbyte) | |
if(error): | |
print('ERROR:',ccs811.checkError(statusbyte)) | |
if(not ccs811.checkDataReady(statusbyte)): | |
print('No new samples are ready') | |
print('---------------------------------'); | |
time.sleep(pause) | |
continue; | |
result = ccs811.readAlg(); | |
if(not result): | |
#print 'Invalid result received' | |
time.sleep(pause) | |
continue; | |
baseline = ccs811.readBaseline() | |
print('eCO2: ',result['eCO2'],' ppm') | |
print('TVOC: ',result['TVOC'], 'ppb') | |
print('Status register: ',bin(result['status'])) | |
print('Last error ID: ',result['errorid']) | |
print('RAW data: ',result['raw']) | |
print('Baseline: ',baseline) | |
print('---------------------------------'); | |
if (THINGSPEAK is not False): | |
thingSpeak(result['eCO2'],result['TVOC'],baseline,temperature,humidity) | |
time.sleep(pause) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# CCS811_RPi | |
# | |
# Petr Lukas | |
# July, 11 2017 | |
# | |
# Version 1.0 | |
import struct, array, time, io, fcntl, logging | |
# I2C Address | |
CCS811_ADDRESS = (0x5A) | |
# Registers | |
CCS811_HW_ID = (0x20) | |
CSS811_STATUS = (0x00) | |
CSS811_APP_START = (0xF4) | |
CSS811_MEAS_MODE = (0x01) | |
CSS811_ERROR_ID = (0xE0) | |
CSS811_RAW_DATA = (0x03) | |
CSS811_ALG_RESULT_DATA = (0x02) | |
CSS811_BASELINE = (0x11) | |
CSS811_ENV_DATA = (0x05) | |
# Errors ID | |
ERROR = {} | |
ERROR[0] = 'WRITE_REG_INVALID' | |
ERROR[1] = 'READ_REG_INVALID' | |
ERROR[2] = 'MEASMODE_INVALID' | |
ERROR[3] = 'MAX_RESISTANCE' | |
ERROR[4] = 'HEATER_FAULT' | |
ERROR[5] = 'HEATER_SUPPLY' | |
I2C_SLAVE=0x0703 | |
#Max number of attempts to read/write I2C | |
MAXATT = 10 | |
#CCS811_fw, CCS811_fr = 0,0 | |
class CCS811_RPi: | |
def __init__(self, twi:int=1, addr:int=CCS811_ADDRESS, maxcnt:int=MAXATT ): | |
self.addr = addr | |
self.twi = twi | |
self.cnt = 0 | |
self.maxcnt = maxcnt | |
self._init_connection_() | |
# helper functions | |
def _init_connection_(self): | |
self.CCS811_fr = io.open("/dev/i2c-"+str(self.twi), "rb", buffering=0) | |
self.CCS811_fw = io.open("/dev/i2c-"+str(self.twi), "wb", buffering=0) | |
# set device address | |
fcntl.ioctl(self.CCS811_fr, I2C_SLAVE, self.addr) | |
fcntl.ioctl(self.CCS811_fw, I2C_SLAVE, self.addr) | |
time.sleep(0.015) | |
self.cnt += 1 | |
time.sleep(0.015) | |
logging.info("Init/Reinit descriptors") | |
def _write(self,s): | |
if self.cnt >= self.maxcnt: | |
logging.error("Exceed maximum number to attempt for reconnection") | |
raise RuntimeError("CCS811_RPi:Exceed maximum number to attempt for reconnection") | |
try: | |
self.CCS811_fw.write( s ) | |
except: | |
logging.error("Fail to write to the sensor") | |
self._init_connection_() | |
self._write(s) | |
self.cnt = 0 | |
def _read(self,datasize): | |
if self.cnt >= self.maxcnt: | |
logging.error("Exceed maximum number to attempt for reconnection") | |
raise RuntimeError("CCS811_RPi:Exceed maximum number to attempt for reconnection") | |
try: | |
data = self.CCS811_fr.read(datasize) | |
except: | |
logging.error("Fail to read the sensor") | |
self._init_connection_() | |
data = self._read(datasize) | |
self.cnt = 0 | |
return data | |
# public functions | |
def checkHWID(self): | |
s = [CCS811_HW_ID] # Hardware ID | |
s2 = bytearray( s ) | |
self._write( s2 ) | |
time.sleep(0.0625) | |
data = self._read(1) | |
buf = array.array('B', data) | |
return hex(buf[0]) | |
def readStatus(self): | |
time.sleep(0.015) | |
s = [CSS811_STATUS] | |
s2 = bytearray( s ) | |
self._write( s2 ) | |
time.sleep(0.0625) | |
data = self._read(1) | |
buf = array.array('B', data) | |
return buf[0] | |
def checkError(self,status_byte): | |
time.sleep(0.015) | |
error_bit = ((status_byte) >> 0) & 1 | |
if(not error_bit): | |
return False | |
s = [CSS811_ERROR_ID] | |
s2 = bytearray( s ) | |
self._write( s2 ) | |
time.sleep(0.0625) | |
data = self._read(1) | |
buf = array.array('B', data) | |
return ERROR[int(buf[0])] | |
def configureSensor(self, configuration): | |
# Switch sensor to application mode | |
s = [CSS811_APP_START] | |
s2 = bytearray( s ) | |
self._write( s2 ) | |
time.sleep(0.0625) | |
s = [CSS811_MEAS_MODE,configuration,0x00] | |
s2 = bytearray( s ) | |
self._write( s2 ) | |
time.sleep(0.015) | |
return | |
def readMeasMode(self): | |
time.sleep(0.015) | |
s = [CSS811_MEAS_MODE] | |
s2 = bytearray( s ) | |
self._write( s2 ) | |
time.sleep(0.0625) | |
data = self._read(1) | |
buf = array.array('B', data) | |
return bin(buf[0]) | |
def readRaw(self): | |
time.sleep(0.015) | |
s = [CSS811_RAW_DATA] | |
s2 = bytearray( s ) | |
self._write( s2 ) | |
time.sleep(0.0625) | |
data = self._read(2) | |
buf = array.array('B', data) | |
return (buf[0]*256 + buf[1]) | |
def readAlg(self): | |
time.sleep(0.015) | |
s = [CSS811_ALG_RESULT_DATA] | |
s2 = bytearray( s ) | |
self._write( s2 ) | |
time.sleep(0.0625) | |
data = self._read(8) | |
buf = array.array('B', data) | |
result = {} | |
# Read eCO2 value and check if it is valid | |
result['eCO2'] = buf[0]*256 + buf[1] | |
if(result['eCO2'] > 8192): | |
logging.error('Invalid eCO2 value') | |
#return False | |
result['eCO2'] = "ERROR" | |
# Read TVOC value and check if it is valid | |
result['TVOC'] = buf[2]*256 + buf[3] | |
if(result['TVOC'] > 1187): | |
logging.error('Invalid TVOC value') | |
#return False | |
result['TVOC'] = "ERROR" | |
result['status'] = buf[4] | |
# Read the last error ID and check if it is valid | |
result['errorid'] = buf[5] | |
if(result['errorid'] > 5): | |
logging.error('Invalid Error ID') | |
return False | |
result['raw'] = buf[6]*256 + buf[7] | |
return result | |
def readBaseline(self): | |
time.sleep(0.015) | |
s = [CSS811_BASELINE] | |
s2 = bytearray( s ) | |
self._write( s2 ) | |
time.sleep(0.0625) | |
data = self._read(2) | |
buf = array.array('B', data) | |
return (buf[0]*256 + buf[1]) | |
def checkDataReady(self, status_byte): | |
ready_bit = ((status_byte) >> 3) & 1 | |
if(ready_bit): | |
return True | |
else: return False | |
def setCompensation(self, temperature, humidity): | |
temperature = round(temperature,2) | |
humidity = round(humidity,2) | |
logging.debug(f'Setting compensation to {temperature}C and {humidity}%') | |
hum1 = int(humidity//0.5) | |
hum2 = int(humidity*512-hum1*256) | |
temperature = temperature+25 | |
temp1 = int(temperature//0.5) | |
temp2 = int(temperature*512-temp1*256) | |
s = [CSS811_ENV_DATA,hum1,hum2,temp1,temp2,0x00] | |
s2 = bytearray( s ) | |
self._write( s2 ) | |
return | |
def setBaseline(self, baseline): | |
loggin.info(f'Setting baseline to {baseline}') | |
buf = [0,0] | |
s = struct.pack('>H', baseline) | |
buf[0], buf[1] = struct.unpack('>BB', s) | |
loggin.info(f'{buf[0]}') | |
loggin.info(f'{buf[1]}') | |
s = [CSS811_BASELINE,buf[0],buf[1],0x00] | |
s2 = bytearray( s ) | |
self._write( s2 ) | |
time.sleep(0.015) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# SDL_Pi_HDC1000 | |
# Raspberry Pi Driver for the SwitchDoc Labs HDC1000 Breakout Board | |
# | |
# SwitchDoc Labs | |
# January 2017 | |
# | |
# Version 1.1 | |
#constants | |
# I2C Address | |
HDC1000_ADDRESS = (0x40) # 1000000 | |
# Registers | |
HDC1000_TEMPERATURE_REGISTER = (0x00) | |
HDC1000_HUMIDITY_REGISTER = (0x01) | |
HDC1000_CONFIGURATION_REGISTER = (0x02) | |
HDC1000_MANUFACTURERID_REGISTER = (0xFE) | |
HDC1000_DEVICEID_REGISTER = (0xFF) | |
HDC1000_SERIALIDHIGH_REGISTER = (0xFB) | |
HDC1000_SERIALIDMID_REGISTER = (0xFC) | |
HDC1000_SERIALIDBOTTOM_REGISTER = (0xFD) | |
#Configuration Register Bits | |
HDC1000_CONFIG_RESET_BIT = (0x8000) | |
HDC1000_CONFIG_HEATER_ENABLE = (0x2000) | |
HDC1000_CONFIG_ACQUISITION_MODE = (0x1000) | |
HDC1000_CONFIG_BATTERY_STATUS = (0x0800) | |
HDC1000_CONFIG_TEMPERATURE_RESOLUTION = (0x0400) | |
HDC1000_CONFIG_HUMIDITY_RESOLUTION_HBIT = (0x0200) | |
HDC1000_CONFIG_HUMIDITY_RESOLUTION_LBIT = (0x0100) | |
HDC1000_CONFIG_TEMPERATURE_RESOLUTION_14BIT = (0x0000) | |
HDC1000_CONFIG_TEMPERATURE_RESOLUTION_11BIT = (0x0400) | |
HDC1000_CONFIG_HUMIDITY_RESOLUTION_14BIT = (0x0000) | |
HDC1000_CONFIG_HUMIDITY_RESOLUTION_11BIT = (0x0100) | |
HDC1000_CONFIG_HUMIDITY_RESOLUTION_8BIT = (0x0200) | |
I2C_SLAVE=0x0703 | |
import struct, array, time, io, fcntl | |
HDC1000_fw= 0 | |
HDC1000_fr= 0 | |
class SDL_Pi_HDC1000: | |
def __init__(self, twi=1, addr=HDC1000_ADDRESS ): | |
global HDC1000_fr, HDC1000_fw | |
HDC1000_fr= io.open("/dev/i2c-"+str(twi), "rb", buffering=0) | |
HDC1000_fw= io.open("/dev/i2c-"+str(twi), "wb", buffering=0) | |
# set device address | |
fcntl.ioctl(HDC1000_fr, I2C_SLAVE, HDC1000_ADDRESS) | |
fcntl.ioctl(HDC1000_fw, I2C_SLAVE, HDC1000_ADDRESS) | |
time.sleep(0.015) #15ms startup time | |
config = HDC1000_CONFIG_ACQUISITION_MODE | |
s = [HDC1000_CONFIGURATION_REGISTER,config>>8,0x00] | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) #sending config register bytes | |
time.sleep(0.015) # From the data sheet | |
# 0x10(48) Temperature, Humidity enabled, Resolultion = 14-bits, Heater off | |
#config = HDC1000_CONFIG_ACQUISITION_MODE | |
#self._bus.write_byte_data(HDC1000_ADDRESS,HDC1000_CONFIGURATION_REGISTER, config>>8) | |
# public functions | |
def readTemperature(self): | |
s = [HDC1000_TEMPERATURE_REGISTER] # temp | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) | |
time.sleep(0.0625) # From the data sheet | |
data = HDC1000_fr.read(2) #read 2 byte temperature data | |
buf = array.array('B', data) | |
#print ( "Temp: %f 0x%X %X" % ( ((((buf[0]<<8) + (buf[1]))/65536.0)*165.0 ) - 40.0 ,buf[0],buf[1] ) ) | |
# Convert the data | |
temp = (buf[0] * 256) + buf[1] | |
cTemp = (temp / 65536.0) * 165.0 - 40 | |
return cTemp | |
def readHumidity(self): | |
# Send humidity measurement command, 0x01(01) | |
time.sleep(0.015) # From the data sheet | |
s = [HDC1000_HUMIDITY_REGISTER] # hum | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) | |
time.sleep(0.0625) # From the data sheet | |
data = HDC1000_fr.read(2) #read 2 byte humidity data | |
buf = array.array('B', data) | |
#print ( "Humidity: %f 0x%X %X " % ( ((((buf[0]<<8) + (buf[1]))/65536.0)*100.0 ), buf[0], buf[1] ) ) | |
humidity = (buf[0] * 256) + buf[1] | |
humidity = (humidity / 65536.0) * 100.0 | |
return humidity | |
def readConfigRegister(self): | |
# Read config register | |
s = [HDC1000_CONFIGURATION_REGISTER] # temp | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) | |
time.sleep(0.0625) # From the data sheet | |
data = HDC1000_fr.read(2) #read 2 byte config data | |
buf = array.array('B', data) | |
#print "register=%X %X"% (buf[0], buf[1]) | |
return buf[0]*256+buf[1] | |
def turnHeaterOn(self): | |
# Read config register | |
config = self.readConfigRegister() | |
config = config | HDC1000_CONFIG_HEATER_ENABLE | |
s = [HDC1000_CONFIGURATION_REGISTER,config>>8,0x00] | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) #sending config register bytes | |
time.sleep(0.015) # From the data sheet | |
return | |
def turnHeaterOff(self): | |
# Read config register | |
config = self.readConfigRegister() | |
config = config & ~HDC1000_CONFIG_HEATER_ENABLE | |
s = [HDC1000_CONFIGURATION_REGISTER,config>>8,0x00] | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) #sending config register bytes | |
time.sleep(0.015) # From the data sheet | |
return | |
def setHumidityResolution(self,resolution): | |
# Read config register | |
config = self.readConfigRegister() | |
config = (config & ~0x0300) | resolution | |
s = [HDC1000_CONFIGURATION_REGISTER,config>>8,0x00] | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) #sending config register bytes | |
time.sleep(0.015) # From the data sheet | |
return | |
def setTemperatureResolution(self,resolution): | |
# Read config register | |
config = self.readConfigRegister() | |
config = (config & ~0x0400) | resolution | |
s = [HDC1000_CONFIGURATION_REGISTER,config>>8,0x00] | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) #sending config register bytes | |
time.sleep(0.015) # From the data sheet | |
return | |
def readBatteryStatus(self): | |
# Read config register | |
config = self.readConfigRegister() | |
config = config & ~ HDC1000_CONFIG_HEATER_ENABLE | |
if (config == 0): | |
return True | |
else: | |
return False | |
return 0 | |
def readManufacturerID(self): | |
s = [HDC1000_MANUFACTURERID_REGISTER] # temp | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) | |
time.sleep(0.0625) # From the data sheet | |
data = HDC1000_fr.read(2) #read 2 byte config data | |
buf = array.array('B', data) | |
return buf[0] * 256 + buf[1] | |
def readDeviceID(self): | |
s = [HDC1000_DEVICEID_REGISTER] # temp | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) | |
time.sleep(0.0625) # From the data sheet | |
data = HDC1000_fr.read(2) #read 2 byte config data | |
buf = array.array('B', data) | |
return buf[0] * 256 + buf[1] | |
def readSerialNumber(self): | |
serialNumber = 0 | |
s = [HDC1000_SERIALIDHIGH_REGISTER] # temp | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) | |
time.sleep(0.0625) # From the data sheet | |
data = HDC1000_fr.read(2) #read 2 byte config data | |
buf = array.array('B', data) | |
serialNumber = buf[0]*256+ buf[1] | |
s = [HDC1000_SERIALIDMID_REGISTER] # temp | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) | |
time.sleep(0.0625) # From the data sheet | |
data = HDC1000_fr.read(2) #read 2 byte config data | |
buf = array.array('B', data) | |
serialNumber = serialNumber*256 + buf[0]*256 + buf[1] | |
s = [HDC1000_SERIALIDBOTTOM_REGISTER] # temp | |
s2 = bytearray( s ) | |
HDC1000_fw.write( s2 ) | |
time.sleep(0.0625) # From the data sheet | |
data = HDC1000_fr.read(2) #read 2 byte config data | |
buf = array.array('B', data) | |
serialNumber = serialNumber*256 + buf[0]*256 + buf[1] | |
return serialNumber |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment