-
-
Save fantasticdonkey/2b0e6b3e045ba451d3b3b4cea5558396 to your computer and use it in GitHub Desktop.
uSGP30 run script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" main.py | |
Application to take measurements from BME280 and SGP30 sensors | |
and send them to a local broker using MQTT. Currently configured | |
to update thing's AWS IoT Shadow document. | |
Note the SGP30 initialisation time (15 seconds) and calibration time | |
(12 hours) stated in the Application Notes. | |
""" | |
import sys | |
import machine | |
from utime import localtime, sleep_ms, ticks_ms, time | |
import umqtt.simple | |
import ujson | |
import uSGP30 | |
import bme280_float | |
import wifi | |
ESP32_DEVICE_NAME = "this-iaq-thing" | |
MEASURE_INTERVAL_MS = const(1000) | |
# I2C parameters | |
I2C_SCL_GPIO = const(18) | |
I2C_SDA_GPIO = const(19) | |
I2C_FREQ = const(400000) | |
# SGP30 specific parameters | |
SGP30_BASELINE_FILE = "sgp30_iaq_baseline.txt" | |
SGP30_BASELINE_INTERVAL_MS = const(3600000) # 1 hour | |
SGP30_EARLY_OPERATION_PHASE_MS = const(43200000) # 12 hours | |
SGP30_BASELINE_VALIDITY_MS = const(604800000) # 7 days | |
SGP30_INIT_MS = const(15000) | |
# MQTT connection parameters (replace with own) | |
MQTT_CONFIG = { | |
"MQTT_BROKER": "192.168.1.124", | |
"MQTT_ESP_NAME": ESP32_DEVICE_NAME, | |
"MQTT_PORT": 8883, | |
"MQTT_TOPIC": "$aws/things/" + ESP32_DEVICE_NAME + "/shadow/update", | |
"MQTT_KEEPALIVE": 4000, | |
"MQTT_SSL": True, | |
"MQTT_SSL_PARAMS": { | |
"CERTFILE": "/certs/" + ESP32_DEVICE_NAME + "-certificate.pem.crt.txt", | |
"KEYFILE": "/certs/" + ESP32_DEVICE_NAME + "-private.pem.key", | |
"SERVER_SIDE": False | |
}, | |
} | |
MQTT_INTERVAL_MS = const(60000) | |
class TemperatureSensor(): | |
""" Bosch BME280 sensor """ | |
def __init__(self, i2c): | |
""" Initialises sensor """ | |
self.bme = bme280_float.BME280(i2c=i2c) | |
def measure(self): | |
""" Takes readings """ | |
return self.bme.read_compensated_data() | |
class IAQSensor(): | |
""" SGP30 indoor air quality sensor """ | |
def __init__(self, i2c, init_algo=True): | |
""" Initialises sensor """ | |
self.sgp30 = uSGP30.SGP30(i2c, init_algo=init_algo) | |
if init_algo: | |
try: | |
with open(SGP30_BASELINE_FILE, "r") as file: | |
current_baseline = ujson.loads(file.read()) | |
except OSError as exception: | |
print_timed(exception) | |
print_timed("No valid baseline found") | |
sleep_ms(SGP30_EARLY_OPERATION_PHASE_MS) | |
else: | |
print_timed("Baseline found: " + str(current_baseline)) | |
if time() - int(current_baseline[2]) > SGP30_BASELINE_VALIDITY_MS: | |
print_timed("Baseline has expired: " + str(current_baseline)) | |
sleep_ms(SGP30_EARLY_OPERATION_PHASE_MS) | |
else: | |
self.sgp30.set_iaq_baseline(current_baseline[0], current_baseline[1]) | |
finally: | |
sleep_ms(SGP30_INIT_MS) | |
def measure(self): | |
""" Takes readings """ | |
return self.sgp30.measure_iaq() | |
def get_baseline(self): | |
""" Gets current baseline from SGP30 """ | |
return self.sgp30.get_iaq_baseline() | |
def set_humidity(self, humidity): | |
""" Sets humidity compensation """ | |
self.sgp30.set_absolute_humidity(humidity) | |
class MQTTHandler(): | |
""" MQTT connection handler """ | |
def __init__(self, mqtt_config): | |
""" Instantiate MQTT connection handler """ | |
self.mqtt_config = mqtt_config | |
try: | |
with open(self.mqtt_config["MQTT_SSL_PARAMS"]["KEYFILE"], "r") as file_in: | |
key_file = file_in.read() | |
with open(self.mqtt_config["MQTT_SSL_PARAMS"]["CERTFILE"], "r") as file_in: | |
cert_file = file_in.read() | |
except OSError as exception: | |
print_timed(exception) | |
print_timed("Could not locate certificates") | |
sys.exit() | |
else: | |
self.mqtt_client = umqtt.simple.MQTTClient( | |
client_id=ESP32_DEVICE_NAME, | |
server=self.mqtt_config["MQTT_BROKER"], | |
port=self.mqtt_config["MQTT_PORT"], | |
keepalive=self.mqtt_config["MQTT_KEEPALIVE"], | |
ssl=self.mqtt_config["MQTT_SSL"], | |
ssl_params={ | |
"key": key_file, | |
"cert": cert_file, | |
"server_side": self.mqtt_config["MQTT_SSL_PARAMS"]["SERVER_SIDE"] | |
} | |
) | |
self.mqtt_client.connect() | |
def publish(self, data): | |
""" Publish a message """ | |
self.mqtt_client.publish(self.mqtt_config["MQTT_TOPIC"], ujson.dumps(data)) | |
print_timed("Data sent... " + str(data) + " " + self.mqtt_config["MQTT_TOPIC"]) | |
def disconnect(self): | |
""" Disconnect from broker """ | |
self.mqtt_client.disconnect() | |
class Controller(): | |
""" Takes sensor readings and publishes them using MQTT """ | |
def __init__(self): | |
""" Initialise the controller """ | |
self.mqtt_handler = MQTTHandler(MQTT_CONFIG) | |
self.i2c = machine.I2C( | |
scl=machine.Pin(I2C_SCL_GPIO, machine.Pin.OUT), | |
sda=machine.Pin(I2C_SDA_GPIO, machine.Pin.OUT), | |
freq=I2C_FREQ | |
) | |
if 118 not in self.i2c.scan(): | |
raise OSError("BME280 not detected") | |
if 88 not in self.i2c.scan(): | |
raise OSError("SGP30 not detected") | |
self.temp_sensor = TemperatureSensor(self.i2c) | |
self.iaq_sensor = IAQSensor(self.i2c) | |
def run(self, loop_ms): | |
""" Main code of the application """ | |
last_baseline_commit_ms = ticks_ms() | |
last_mqtt_publish_ms = last_baseline_commit_ms | |
while True: | |
time_start_ms = ticks_ms() | |
payload = self._get_measurements() | |
absolute_humidity = uSGP30.convert_r_to_a_humidity( | |
payload["state"]["reported"]["temp_c"], | |
payload["state"]["reported"]["humidity_perc"] | |
) | |
self.iaq_sensor.set_humidity(absolute_humidity) | |
if ticks_ms() - last_mqtt_publish_ms > MQTT_INTERVAL_MS: | |
# TODO: Currently, only the latest measurements are sent, and | |
# all other prior measurements are discarded. It may be useful | |
# to average all timed measurements since the last publish. | |
self.mqtt_handler.publish(payload) | |
last_mqtt_publish_ms = ticks_ms() | |
if ticks_ms() - last_baseline_commit_ms > SGP30_BASELINE_INTERVAL_MS: | |
# Get current baseline and store on local flash | |
current_baseline = self.iaq_sensor.get_baseline() | |
current_baseline.append(time()) | |
with open(SGP30_BASELINE_FILE, "w") as file: | |
file.write(str(current_baseline)) | |
print_timed("Baseline commited: " + str(current_baseline)) | |
last_baseline_commit_ms = ticks_ms() | |
time_remaining_ms = loop_ms - (ticks_ms() - time_start_ms) | |
sleep_ms(time_remaining_ms) | |
def _get_measurements(self): | |
""" Obtain measurements from the sensors """ | |
shadow_data = {"state": {"reported": {}}} | |
data = {} | |
temp_c, pressure_pa, humidity_perc = self.temp_sensor.measure() | |
co2eq_ppm, tvoc_ppb = self.iaq_sensor.measure() | |
data["device_id"] = ESP32_DEVICE_NAME | |
data["temp_c"] = temp_c | |
data["pressure_pa"] = pressure_pa | |
data["humidity_perc"] = humidity_perc | |
data["humidity_gm3"] = uSGP30.convert_r_to_a_humidity( | |
temp_c, humidity_perc, fixed_point=False | |
) | |
data["co2eq_ppm"] = co2eq_ppm | |
data["tvoc_ppb"] = tvoc_ppb | |
data["timestamp"] = get_timestamp() | |
shadow_data["state"]["reported"] = data | |
return shadow_data | |
def get_timestamp(): | |
""" Returns a string for current date / time in ISO 8601 standard """ | |
year, month, mday, hour, minute, second, weekday, yearday = localtime() | |
return ( | |
str(year) + "-" + "{:02d}".format(month) + "-" + "{:02d}".format(mday) + "T" + | |
"{:02d}".format(hour) + ":" + "{:02d}".format(minute) + ":" + "{:02d}".format(second) + "Z" | |
) | |
def print_timed(message): | |
""" Output to console with current timestamp """ | |
print(get_timestamp() + ": " + message) | |
def main(): | |
""" Main application """ | |
wifi.wifi_connect() | |
controller = Controller() | |
controller.run(loop_ms=MEASURE_INTERVAL_MS) | |
if __name__ == "__main__": | |
# Run the application | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment