Last active
December 26, 2021 20:21
-
-
Save aixnr/9cc837d0ec7abbc13419722010bc44d5 to your computer and use it in GitHub Desktop.
A script to run on CircuitPython v7.x for reading sensor data from AHT20 and sending to Adafruit IO with ESP32 breakout board.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import busio | |
import board | |
import digitalio | |
# Additional libraries for AHT20, ESP32 Airlift, requests, socket, and MQTT | |
import adafruit_ahtx0 | |
from adafruit_esp32spi import adafruit_esp32spi | |
from adafruit_esp32spi import adafruit_esp32spi_wifimanager | |
import adafruit_requests as requests | |
import adafruit_esp32spi.adafruit_esp32spi_socket as socket | |
import adafruit_minimqtt.adafruit_minimqtt as MQTT | |
from adafruit_io.adafruit_io import IO_MQTT | |
# Import custom libraries | |
from led_repeater import internal_blinker | |
# First run, blink 5 times | |
print("RP2040, booting up completed!") | |
internal_blinker(repeat=5) | |
# Assigning GPIO pins | |
esp32_cs = digitalio.DigitalInOut(board.GP13) | |
esp32_ready = digitalio.DigitalInOut(board.GP14) | |
esp32_reset = digitalio.DigitalInOut(board.GP15) | |
spi = busio.SPI(board.GP10, board.GP11, board.GP12) | |
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) | |
# Set socket (for requests library) to use esp | |
requests.set_socket(socket, esp) | |
# Check WiFi adapter status | |
if esp.status == adafruit_esp32spi.WL_IDLE_STATUS: | |
print("ESP32 found and in idle mode") | |
print("Firmware version: ", bytearray(esp.firmware_version).decode()) | |
print("MAC address: ", [hex(i) for i in esp.MAC_address]) | |
# Import secrets from ./secrets.py | |
try: | |
from secrets import secrets | |
except ImportError: | |
print("Secrets not found, aborting...") | |
raise | |
# WiFi manager (simpler than using esp) | |
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets) | |
# Connect to internet using WiFi manager | |
wifi.connect() | |
print("WiFi Connected!") | |
# After connected to WiFi, blink 3 times | |
internal_blinker(repeat=3) | |
# [MQTT] Define callbacks after connected to broker | |
def connected(client): | |
# Connected function will be called when the client is connected to broker. | |
print("Connected to a MQTT broker! ") | |
def subscribe(client, userdata, topic, granted_qos): | |
# This method is called when the client subscribes to a new feed. | |
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos)) | |
def disconnected(client): | |
# Disconnected function will be called when the client disconnects. | |
print("Disconnected from MQTT broker :(") | |
# [MQTT] Socket setting | |
MQTT.set_socket(socket, esp) | |
# [MQTT] Client object initialization | |
mqtt_client = MQTT.MQTT( | |
broker="io.adafruit.com", | |
username=secrets["aio_username"], | |
password=secrets["aio_key"] | |
) | |
# [MQTT] Initialize IO object | |
io = IO_MQTT(mqtt_client) | |
# [MQTT] Connect the callback methods defined above | |
io.on_connect = connected | |
io.on_disconnect = disconnected | |
io.on_subscribe = subscribe | |
# [MQTT] Establish connection | |
print("Connecting to MQTT broker") | |
io.connect() | |
# Configure AHT20 temp and humidity sensor | |
i2c = busio.I2C(scl=board.GP21, sda=board.GP20) | |
sensor = adafruit_ahtx0.AHTx0(i2c) | |
# The 30 sec counter | |
prv_refresh_time = 0.0 | |
# Main program loop | |
while True: | |
# Poll for incoming message | |
try: | |
io.loop() | |
except (ValueError, RuntimeError) as e: | |
print("Failed connecting MQTT broker, retrying\n", e) | |
wifi.reset() | |
wifi.connect() | |
io.reconnect() | |
continue | |
# Send temperature reading | |
if (time.monotonic() - prv_refresh_time) > 30: | |
# Blink LED once before reading data, sleep for 2 sec | |
internal_blinker(repeat=1) | |
time.sleep(2) | |
# Read sensor data and publishing to Adafruit IO | |
_temp = sensor.temperature | |
print(f"Ambient temperature is {round(_temp, 1)} degree C") | |
print("Pushing data temperature feed") | |
io.publish("aht20-temp", _temp) | |
print("Published!") | |
# Update prv_refresh_time counter | |
prv_refresh_time = time.monotonic() | |
# Blink LED twice after a succesful push | |
internal_blinker(repeat=2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment