Skip to content

Instantly share code, notes, and snippets.

@ajmontag
Created May 24, 2020 01:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ajmontag/3a5fdd4bf24d209c568a7e25524dd3fc to your computer and use it in GitHub Desktop.
Save ajmontag/3a5fdd4bf24d209c568a7e25524dd3fc to your computer and use it in GitHub Desktop.
Read from SCD30 Sensor on Raspberry PI
#!/usr/bin/env python
# coding=utf-8
#
# Based on https://github.com/UnravelTEC/Raspi-Driver-SCD30
# Copyright © 2018 UnravelTEC
# Michael Maier <michael.maier+github@unraveltec.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# If you want to relicense this code under another license, please contact info+github@unraveltec.com.
# hints from https://www.raspberrypi.org/forums/viewtopic.php?p=600515#p600515
from __future__ import print_function
# This module uses the services of the C pigpio library. pigpio must be running on the Pi(s) whose GPIO are to be manipulated.
# cmd ref: http://abyz.me.uk/rpi/pigpio/python.html#i2c_write_byte_data
import pigpio # aptitude install python-pigpio
import time
import struct
import sys
import crcmod # aptitude install python-crcmod
import optparse
import logging
from logging import debug, warn, info
class SCD30:
def __init__(self, pi, bus_num, addr=0x61):
self.pi = pi
self.bus_num = bus_num
self.addr = addr
if not self.pi.connected:
warn("no connection to pigpio daemon")
exit(1)
try:
self.pi.i2c_close(0)
except:
if sys.exc_value and str(sys.exc_value) != "'unknown handle'":
warn("Unknown error: ", sys.exc_type, ":", sys.exc_value)
self.h = None
try:
self.h = self.pi.i2c_open(self.bus_num, self.addr)
except:
warn("i2c open failed")
exit(1)
def __del__(self):
self.pi.i2c_close(self.h)
def read_n_bytes(self, n):
try:
(count, data) = self.pi.i2c_read_device(self.h, n)
except:
warn("error: i2c_read failed")
exit(1)
if count == n:
return data
else:
warn("error: read measurement interval didnt return " + str(n) + "B")
return False
def i2cWrite(self, data):
try:
self.pi.i2c_write_device(self.h, data)
except:
warn("error: i2c_write failed")
return -1
return True
# read meas interval (not documented, but works)
def read_meas_interval(self):
ret = self.i2cWrite([0x46, 0x00])
if ret == -1:
return -1
try:
(count, data) = self.pi.i2c_read_device(self.h, 3)
except:
warn("error: i2c_read failed")
exit(1)
if count == 3:
if len(data) == 3:
interval = int(data[0])*256 + int(data[1])
#print "measurement interval: " + str(interval) + "s, checksum " + str(data[2])
return interval
else:
warn("error: no array len 3 returned, instead " + str(len(data)) + "type: " + str(type(data)))
else:
"error: read measurement interval didnt return 3B"
return -1
# def set_meas_interval():
# read_meas_result = read_meas_interval()
# if read_meas_result == -1:
# warn("bad meas interval")
# exit(1)
# if read_meas_result != 2:
# # if not every 2s, set it
# info("setting interval to 2")
# ret = i2cWrite([0x46, 0x00, 0x00, 0x02, 0xE3])
# if ret == -1:
# exit(1)
# read_meas_interval()
def read(self):
#trigger cont meas
pressure_mbar = 0 # use default pressure
LSB = 0xFF & pressure_mbar
MSB = 0xFF & (pressure_mbar >> 8)
#print ("MSB: " + hex(MSB) + " LSB: " + hex(LSB))
#pressure_re = LSB + (MSB * 256)
#print("press " + str(pressure_re))
pressure = [MSB, LSB]
pressure_array = ''.join(chr(x) for x in [pressure[0], pressure[1]])
#pressure_array = ''.join(chr(x) for x in [0xBE, 0xEF]) # use for testing crc, should be 0x92
#print pressure_array
f_crc8 = crcmod.mkCrcFun(0x131, 0xFF, False, 0x00)
crc8 = f_crc8(pressure_array) # for pressure 0, should be 0x81
# print "CRC: " + hex(crc8)
self.i2cWrite([0x00, 0x10, pressure[0], pressure[1], crc8]) # also activates cont measurement
#activating ASC
#i2cWrite([0x53, 0x06, 0x00, 0x00, f_crc8(''.join(chr(x) for x in [0x00,0x01]))])
# read ready status
while True:
ret = self.i2cWrite([0x02, 0x02])
if ret == -1:
exit(1)
data = self.read_n_bytes(3)
if data == False:
time.sleep(0.1)
continue
if data[1] == 1:
#print "data ready"
break
else:
#eprint(".")
time.sleep(0.1)
#read measurement
self.i2cWrite([0x03, 0x00])
data = self.read_n_bytes(18)
#print "CO2: " + str(data[0]) +" "+ str(data[1]) +" "+ str(data[3]) +" "+ str(data[4])
if data == False:
exit(1)
struct_co2 = struct.pack('>BBBB', data[0], data[1], data[3], data[4])
float_co2 = struct.unpack('>f', struct_co2)[0]
struct_T = struct.pack('>BBBB', data[6], data[7], data[9], data[10])
float_T = struct.unpack('>f', struct_T)[0]
struct_rH = struct.pack('>BBBB', data[12], data[13], data[15], data[16])
float_rH = struct.unpack('>f', struct_rH)[0]
data = dict()
data['CO2'] = float_co2
#data['temperature_degC'] = float_T
data['temperature_degF'] = (float_T * 9.0 / 5.0) + 32
data['humidity'] = float_rH
info(data)
return data
def parseOptions(argv, progName, progDesc):
optParser = optparse.OptionParser(prog=progName, description=progDesc)
optParser.add_option("-b", "--bus",
help="Bus Number",
type="int",
default=1)
optParser.add_option("-v", "--verbose", action="count", help="verbose")
options, extraArgs = optParser.parse_args(argv)
if extraArgs:
optParser.print_help()
exit(1)
return options
def main():
options = parseOptions(sys.argv[1:], sys.argv[0], "SCD30 Control")
if options.verbose >= 2:
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(message)s',
datefmt='%d %b %Y %H:%M:%S')
elif options.verbose == 1:
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(message)s',
datefmt='%d %b %Y %H:%M:%S')
else:
logging.basicConfig(level=logging.WARN,
format='%(asctime)s %(message)s',
datefmt='%d %b %Y %H:%M:%S')
PIGPIO_HOST = '::1'
PIGPIO_HOST = '127.0.0.1'
pi = pigpio.pi(PIGPIO_HOST)
sensor = SCD30(pi, options.bus)
print(sensor.read())
exit(0)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment