Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
CircuitPython MQTT demo using
# -- MQTT demo using
# 9 Oct 2021 - @todbot
# Uses LEDs on Adafruit Funhouse but adaptable to any CircuitPython WiFi board.
# To see what's going on, download MQTT-Explorer from
# Launch it and connect to the MQTT host/port/username/password as in secrets below.
# From MQTT-Explorer, you should see messages to "todbot/feeds/hello" every 3 seconds
# And then you can go to the right of MQTT-Explorer to Publish, fill in "todbot/feeds/light",
# click the "Raw" button and send a number from 0-4 to light up that LED purple.
# Here's what could contain:
# secrets = {
# "ssid": "PrettyFlyForAWiFi",
# "password": "donthackme",
# "mqtt_broker": "",
# "mqtt_port": 1883,
# "mqtt_username": "public",
# "mqtt_password": "public",
# }
import board, time
import adafruit_dotstar
import displayio, terminalio
from adafruit_display_text import label
import ssl, socketpool, wifi
import adafruit_minimqtt.adafruit_minimqtt as MQTT
leds = adafruit_dotstar.DotStar(
board.DOTSTAR_CLOCK, board.DOTSTAR_DATA, 5, brightness=0.3
my_mqtt_topic_hello = "todbot/feeds/hello" # the topic we send on
my_mqtt_topic_light = "todbot/feeds/light" # the topic we receive on (could be the same)
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in, please add them there!")
# Connect to WiFi
print("Connecting to %s" % secrets["ssid"])["ssid"], secrets["password"])
print("Connected to %s!" % secrets["ssid"])
# Socketpool for mqtt_client connections
pool = socketpool.SocketPool(
# Set up a MiniMQTT Client
mqtt_client = MQTT.MQTT(
# Callback functions for mqtt_client
def connected(client, userdata, flags, rc):
# Called when the client is connected successfully to the broker
print("Connected to MQTT broker!")
client.subscribe( my_mqtt_topic_light) # say I want to listen to this topic
def disconnected(client, userdata, rc):
# Called when the client is disconnected
print("Disconnected from MQTT broker!")
def message(client, topic, message):
# Called when a topic the client is subscribed to has a new message
print("New message on topic {0}: {1}".format(topic, message))
val = 0
val = int(message) # attempt to parse it as a number
except ValueError:
val = min(max(val, 0),len(leds)) # map it to number of dotstar LEDs
print("setting LED ",val)
leds[ val ] = 0xff00ff
# Setup the callback methods above
mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message
# Connect the client to the MQTT broker.
print("Connecting to MQTT broker...")
last_msg_send_time = time.monotonic()
while True:
mqtt_client.loop() # see if any messages to me
if time.monotonic() - last_msg_send_time > 3.0: # send a message every 3 secs
last_msg_send_time = time.monotonic()
msg = "hi there! time is "+str(time.monotonic())
print("sending MQTT msg..", msg)
mqtt_client.publish( my_mqtt_topic_hello, msg )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment