Skip to content

Instantly share code, notes, and snippets.

@olieidel
Created January 17, 2021 19:48
Show Gist options
  • Save olieidel/25c1072b2fa8a53528971c18e5a5997a to your computer and use it in GitHub Desktop.
Save olieidel/25c1072b2fa8a53528971c18e5a5997a to your computer and use it in GitHub Desktop.
Ancient python code which I coded ages ago to analyse air quality with the BME680 and SGP30 sensors. Has some nifty calibration stuff in there which I couldn't find anywhere else, at least not in this combination. It was part of a django app, that's why it imports the "models". That's also how it persists data.
import math
import time
import threading
import adafruit_sgp30
import bme680
import board
import busio
from piaq.monitor.models import SensorReading, SensorSettings
def absolute_humidity(temperature, humidity):
"""Taken from a Pull Request to the Adafruit SGP30 Arduino library:
https://github.com/adafruit/Adafruit_SGP30/pull/4
This refers to a formula mentioned in the SGP30 sensor docs:
https://www.sensirion.com/fileadmin/user_upload/customers/sensirion/Dokumente/9_Gas_Sensors/Sensirion_Gas_Sensors_SGP30_Driver-Integration-Guide_SW_I2C.pdf
"""
t = temperature
rh = humidity
ah = 216.7 * \
((rh / 100.0) * 6.112 * math.exp((17.62 * t) / (243.12 + t))) / \
(273.15 + t)
return ah
class PollingThread(threading.Thread):
def __init__(self, polling_interval):
self.polling_interval = polling_interval
self.stoprequest = threading.Event()
self.bme680 = None
self.sgp30 = None
self._init_bme680()
self._init_sgp30()
super().__init__()
def _init_bme680(self):
try:
sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY)
except IOError:
sensor = bme680.BME680(bme680.I2C_ADDR_SECONDARY)
sensor.set_humidity_oversample(bme680.OS_2X)
sensor.set_pressure_oversample(bme680.OS_4X)
sensor.set_temperature_oversample(bme680.OS_8X)
sensor.set_filter(bme680.FILTER_SIZE_3)
sensor.set_gas_status(bme680.ENABLE_GAS_MEAS)
sensor.set_gas_heater_temperature(320)
sensor.set_gas_heater_duration(150)
sensor.select_gas_heater_profile(0)
self.bme680 = sensor
def _init_sgp30(self):
i2c = busio.I2C(board.SCL, board.SDA, frequency=100000)
sgp30 = adafruit_sgp30.Adafruit_SGP30(i2c)
sgp30.iaq_init()
self.sgp30 = sgp30
self._set_sgp30_baseline()
def _set_sgp30_baseline(self):
last_reading = SensorReading.objects.exclude(
# Exclude readings where both values are 0, this can
# happen when the sensor is warming up
baseline_eCO2=0,
baseline_TVOC=0,
).order_by('-time').first()
if last_reading:
self.sgp30.set_iaq_baseline(
eCO2=last_reading.baseline_eCO2,
TVOC=last_reading.baseline_TVOC,
)
def _set_sgp30_humidity_compensation(self, humidity, temperature):
"""Update humidity compensation on the SGP30"""
self.sgp30.set_iaq_humidity(
gramsPM3=absolute_humidity(
humidity=humidity,
temperature=temperature,
),
)
def _should_stop(self):
settings = SensorSettings.objects.first()
return (not settings.is_started) or self.stoprequest.isSet()
def _read_and_save(self):
got_data = self.bme680.get_sensor_data()
if not got_data:
return
temperature = self.bme680.data.temperature
humidity = self.bme680.data.humidity
SensorReading.objects.create(
temperature=temperature,
humidity=humidity,
pressure=self.bme680.data.pressure,
gas_resistance=self.bme680.data.gas_resistance,
eCO2=self.sgp30.eCO2,
TVOC=self.sgp30.TVOC,
baseline_eCO2=self.sgp30.baseline_eCO2,
baseline_TVOC=self.sgp30.baseline_TVOC,
)
self._set_sgp30_humidity_compensation(temperature, humidity)
def join(self, timeout=None):
self.stoprequest.set()
super().join(timeout)
def run(self):
while not self._should_stop():
self._read_and_save()
time.sleep(self.polling_interval)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment