Skip to content

Instantly share code, notes, and snippets.

@fantasticdonkey
Last active May 6, 2020 20:47
Show Gist options
  • Save fantasticdonkey/2b0e6b3e045ba451d3b3b4cea5558396 to your computer and use it in GitHub Desktop.
Save fantasticdonkey/2b0e6b3e045ba451d3b3b4cea5558396 to your computer and use it in GitHub Desktop.
uSGP30 run script
""" main.py
Application to take measurements from BME280 and SGP30 sensors
and send them to a local broker using MQTT. Currently configured
to update thing's AWS IoT Shadow document.
Note the SGP30 initialisation time (15 seconds) and calibration time
(12 hours) stated in the Application Notes.
"""
import sys
import machine
from utime import localtime, sleep_ms, ticks_ms, time
import umqtt.simple
import ujson
import uSGP30
import bme280_float
import wifi
ESP32_DEVICE_NAME = "this-iaq-thing"
MEASURE_INTERVAL_MS = const(1000)
# I2C parameters
I2C_SCL_GPIO = const(18)
I2C_SDA_GPIO = const(19)
I2C_FREQ = const(400000)
# SGP30 specific parameters
SGP30_BASELINE_FILE = "sgp30_iaq_baseline.txt"
SGP30_BASELINE_INTERVAL_MS = const(3600000) # 1 hour
SGP30_EARLY_OPERATION_PHASE_MS = const(43200000) # 12 hours
SGP30_BASELINE_VALIDITY_MS = const(604800000) # 7 days
SGP30_INIT_MS = const(15000)
# MQTT connection parameters (replace with own)
MQTT_CONFIG = {
"MQTT_BROKER": "192.168.1.124",
"MQTT_ESP_NAME": ESP32_DEVICE_NAME,
"MQTT_PORT": 8883,
"MQTT_TOPIC": "$aws/things/" + ESP32_DEVICE_NAME + "/shadow/update",
"MQTT_KEEPALIVE": 4000,
"MQTT_SSL": True,
"MQTT_SSL_PARAMS": {
"CERTFILE": "/certs/" + ESP32_DEVICE_NAME + "-certificate.pem.crt.txt",
"KEYFILE": "/certs/" + ESP32_DEVICE_NAME + "-private.pem.key",
"SERVER_SIDE": False
},
}
MQTT_INTERVAL_MS = const(60000)
class TemperatureSensor():
""" Bosch BME280 sensor """
def __init__(self, i2c):
""" Initialises sensor """
self.bme = bme280_float.BME280(i2c=i2c)
def measure(self):
""" Takes readings """
return self.bme.read_compensated_data()
class IAQSensor():
""" SGP30 indoor air quality sensor """
def __init__(self, i2c, init_algo=True):
""" Initialises sensor """
self.sgp30 = uSGP30.SGP30(i2c, init_algo=init_algo)
if init_algo:
try:
with open(SGP30_BASELINE_FILE, "r") as file:
current_baseline = ujson.loads(file.read())
except OSError as exception:
print_timed(exception)
print_timed("No valid baseline found")
sleep_ms(SGP30_EARLY_OPERATION_PHASE_MS)
else:
print_timed("Baseline found: " + str(current_baseline))
if time() - int(current_baseline[2]) > SGP30_BASELINE_VALIDITY_MS:
print_timed("Baseline has expired: " + str(current_baseline))
sleep_ms(SGP30_EARLY_OPERATION_PHASE_MS)
else:
self.sgp30.set_iaq_baseline(current_baseline[0], current_baseline[1])
finally:
sleep_ms(SGP30_INIT_MS)
def measure(self):
""" Takes readings """
return self.sgp30.measure_iaq()
def get_baseline(self):
""" Gets current baseline from SGP30 """
return self.sgp30.get_iaq_baseline()
def set_humidity(self, humidity):
""" Sets humidity compensation """
self.sgp30.set_absolute_humidity(humidity)
class MQTTHandler():
""" MQTT connection handler """
def __init__(self, mqtt_config):
""" Instantiate MQTT connection handler """
self.mqtt_config = mqtt_config
try:
with open(self.mqtt_config["MQTT_SSL_PARAMS"]["KEYFILE"], "r") as file_in:
key_file = file_in.read()
with open(self.mqtt_config["MQTT_SSL_PARAMS"]["CERTFILE"], "r") as file_in:
cert_file = file_in.read()
except OSError as exception:
print_timed(exception)
print_timed("Could not locate certificates")
sys.exit()
else:
self.mqtt_client = umqtt.simple.MQTTClient(
client_id=ESP32_DEVICE_NAME,
server=self.mqtt_config["MQTT_BROKER"],
port=self.mqtt_config["MQTT_PORT"],
keepalive=self.mqtt_config["MQTT_KEEPALIVE"],
ssl=self.mqtt_config["MQTT_SSL"],
ssl_params={
"key": key_file,
"cert": cert_file,
"server_side": self.mqtt_config["MQTT_SSL_PARAMS"]["SERVER_SIDE"]
}
)
self.mqtt_client.connect()
def publish(self, data):
""" Publish a message """
self.mqtt_client.publish(self.mqtt_config["MQTT_TOPIC"], ujson.dumps(data))
print_timed("Data sent... " + str(data) + " " + self.mqtt_config["MQTT_TOPIC"])
def disconnect(self):
""" Disconnect from broker """
self.mqtt_client.disconnect()
class Controller():
""" Takes sensor readings and publishes them using MQTT """
def __init__(self):
""" Initialise the controller """
self.mqtt_handler = MQTTHandler(MQTT_CONFIG)
self.i2c = machine.I2C(
scl=machine.Pin(I2C_SCL_GPIO, machine.Pin.OUT),
sda=machine.Pin(I2C_SDA_GPIO, machine.Pin.OUT),
freq=I2C_FREQ
)
if 118 not in self.i2c.scan():
raise OSError("BME280 not detected")
if 88 not in self.i2c.scan():
raise OSError("SGP30 not detected")
self.temp_sensor = TemperatureSensor(self.i2c)
self.iaq_sensor = IAQSensor(self.i2c)
def run(self, loop_ms):
""" Main code of the application """
last_baseline_commit_ms = ticks_ms()
last_mqtt_publish_ms = last_baseline_commit_ms
while True:
time_start_ms = ticks_ms()
payload = self._get_measurements()
absolute_humidity = uSGP30.convert_r_to_a_humidity(
payload["state"]["reported"]["temp_c"],
payload["state"]["reported"]["humidity_perc"]
)
self.iaq_sensor.set_humidity(absolute_humidity)
if ticks_ms() - last_mqtt_publish_ms > MQTT_INTERVAL_MS:
# TODO: Currently, only the latest measurements are sent, and
# all other prior measurements are discarded. It may be useful
# to average all timed measurements since the last publish.
self.mqtt_handler.publish(payload)
last_mqtt_publish_ms = ticks_ms()
if ticks_ms() - last_baseline_commit_ms > SGP30_BASELINE_INTERVAL_MS:
# Get current baseline and store on local flash
current_baseline = self.iaq_sensor.get_baseline()
current_baseline.append(time())
with open(SGP30_BASELINE_FILE, "w") as file:
file.write(str(current_baseline))
print_timed("Baseline commited: " + str(current_baseline))
last_baseline_commit_ms = ticks_ms()
time_remaining_ms = loop_ms - (ticks_ms() - time_start_ms)
sleep_ms(time_remaining_ms)
def _get_measurements(self):
""" Obtain measurements from the sensors """
shadow_data = {"state": {"reported": {}}}
data = {}
temp_c, pressure_pa, humidity_perc = self.temp_sensor.measure()
co2eq_ppm, tvoc_ppb = self.iaq_sensor.measure()
data["device_id"] = ESP32_DEVICE_NAME
data["temp_c"] = temp_c
data["pressure_pa"] = pressure_pa
data["humidity_perc"] = humidity_perc
data["humidity_gm3"] = uSGP30.convert_r_to_a_humidity(
temp_c, humidity_perc, fixed_point=False
)
data["co2eq_ppm"] = co2eq_ppm
data["tvoc_ppb"] = tvoc_ppb
data["timestamp"] = get_timestamp()
shadow_data["state"]["reported"] = data
return shadow_data
def get_timestamp():
""" Returns a string for current date / time in ISO 8601 standard """
year, month, mday, hour, minute, second, weekday, yearday = localtime()
return (
str(year) + "-" + "{:02d}".format(month) + "-" + "{:02d}".format(mday) + "T" +
"{:02d}".format(hour) + ":" + "{:02d}".format(minute) + ":" + "{:02d}".format(second) + "Z"
)
def print_timed(message):
""" Output to console with current timestamp """
print(get_timestamp() + ": " + message)
def main():
""" Main application """
wifi.wifi_connect()
controller = Controller()
controller.run(loop_ms=MEASURE_INTERVAL_MS)
if __name__ == "__main__":
# Run the application
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment