Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@aixnr
Last active December 26, 2021 20:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aixnr/9cc837d0ec7abbc13419722010bc44d5 to your computer and use it in GitHub Desktop.
Save aixnr/9cc837d0ec7abbc13419722010bc44d5 to your computer and use it in GitHub Desktop.
A script to run on CircuitPython v7.x for reading sensor data from AHT20 and sending to Adafruit IO with ESP32 breakout board.
import time
import busio
import board
import digitalio
# Additional libraries for AHT20, ESP32 Airlift, requests, socket, and MQTT
import adafruit_ahtx0
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
import adafruit_requests as requests
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from adafruit_io.adafruit_io import IO_MQTT
# Import custom libraries
from led_repeater import internal_blinker
# First run, blink 5 times
print("RP2040, booting up completed!")
internal_blinker(repeat=5)
# Assigning GPIO pins
esp32_cs = digitalio.DigitalInOut(board.GP13)
esp32_ready = digitalio.DigitalInOut(board.GP14)
esp32_reset = digitalio.DigitalInOut(board.GP15)
spi = busio.SPI(board.GP10, board.GP11, board.GP12)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
# Set socket (for requests library) to use esp
requests.set_socket(socket, esp)
# Check WiFi adapter status
if esp.status == adafruit_esp32spi.WL_IDLE_STATUS:
print("ESP32 found and in idle mode")
print("Firmware version: ", bytearray(esp.firmware_version).decode())
print("MAC address: ", [hex(i) for i in esp.MAC_address])
# Import secrets from ./secrets.py
try:
from secrets import secrets
except ImportError:
print("Secrets not found, aborting...")
raise
# WiFi manager (simpler than using esp)
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets)
# Connect to internet using WiFi manager
wifi.connect()
print("WiFi Connected!")
# After connected to WiFi, blink 3 times
internal_blinker(repeat=3)
# [MQTT] Define callbacks after connected to broker
def connected(client):
# Connected function will be called when the client is connected to broker.
print("Connected to a MQTT broker! ")
def subscribe(client, userdata, topic, granted_qos):
# This method is called when the client subscribes to a new feed.
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
def disconnected(client):
# Disconnected function will be called when the client disconnects.
print("Disconnected from MQTT broker :(")
# [MQTT] Socket setting
MQTT.set_socket(socket, esp)
# [MQTT] Client object initialization
mqtt_client = MQTT.MQTT(
broker="io.adafruit.com",
username=secrets["aio_username"],
password=secrets["aio_key"]
)
# [MQTT] Initialize IO object
io = IO_MQTT(mqtt_client)
# [MQTT] Connect the callback methods defined above
io.on_connect = connected
io.on_disconnect = disconnected
io.on_subscribe = subscribe
# [MQTT] Establish connection
print("Connecting to MQTT broker")
io.connect()
# Configure AHT20 temp and humidity sensor
i2c = busio.I2C(scl=board.GP21, sda=board.GP20)
sensor = adafruit_ahtx0.AHTx0(i2c)
# The 30 sec counter
prv_refresh_time = 0.0
# Main program loop
while True:
# Poll for incoming message
try:
io.loop()
except (ValueError, RuntimeError) as e:
print("Failed connecting MQTT broker, retrying\n", e)
wifi.reset()
wifi.connect()
io.reconnect()
continue
# Send temperature reading
if (time.monotonic() - prv_refresh_time) > 30:
# Blink LED once before reading data, sleep for 2 sec
internal_blinker(repeat=1)
time.sleep(2)
# Read sensor data and publishing to Adafruit IO
_temp = sensor.temperature
print(f"Ambient temperature is {round(_temp, 1)} degree C")
print("Pushing data temperature feed")
io.publish("aht20-temp", _temp)
print("Published!")
# Update prv_refresh_time counter
prv_refresh_time = time.monotonic()
# Blink LED twice after a succesful push
internal_blinker(repeat=2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment