Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
CircuitPython MQTT demo using public.cloud.shiftr.io
# mqtt_demo_code.py -- MQTT demo using public.cloud.shiftr.io
# 9 Oct 2021 - @todbot
#
# Uses LEDs on Adafruit Funhouse but adaptable to any CircuitPython WiFi board.
#
# To see what's going on, download MQTT-Explorer from http://mqtt-explorer.com/
# Launch it and connect to the MQTT host/port/username/password as in secrets below.
#
# From MQTT-Explorer, you should see messages to "todbot/feeds/hello" every 3 seconds
# And then you can go to the right of MQTT-Explorer to Publish, fill in "todbot/feeds/light",
# click the "Raw" button and send a number from 0-4 to light up that LED purple.
#
# Here's what secrets.py could contain:
#
# secrets = {
# "ssid": "PrettyFlyForAWiFi",
# "password": "donthackme",
# "mqtt_broker": "public.cloud.shiftr.io",
# "mqtt_port": 1883,
# "mqtt_username": "public",
# "mqtt_password": "public",
# }
#
import board, time
import adafruit_dotstar
import displayio, terminalio
from adafruit_display_text import label
import ssl, socketpool, wifi
import adafruit_minimqtt.adafruit_minimqtt as MQTT
leds = adafruit_dotstar.DotStar(
board.DOTSTAR_CLOCK, board.DOTSTAR_DATA, 5, brightness=0.3
)
leds.fill(0)
my_mqtt_topic_hello = "todbot/feeds/hello" # the topic we send on
my_mqtt_topic_light = "todbot/feeds/light" # the topic we receive on (could be the same)
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
# Connect to WiFi
print("Connecting to %s" % secrets["ssid"])
wifi.radio.connect(secrets["ssid"], secrets["password"])
print("Connected to %s!" % secrets["ssid"])
# Socketpool for mqtt_client connections
pool = socketpool.SocketPool(wifi.radio)
# Set up a MiniMQTT Client
mqtt_client = MQTT.MQTT(
broker=secrets["mqtt_broker"],
port=secrets["mqtt_port"],
username=secrets["mqtt_username"],
password=secrets["mqtt_password"],
socket_pool=pool,
ssl_context=ssl.create_default_context(),
)
# Callback functions for mqtt_client
def connected(client, userdata, flags, rc):
# Called when the client is connected successfully to the broker
print("Connected to MQTT broker!")
client.subscribe( my_mqtt_topic_light) # say I want to listen to this topic
def disconnected(client, userdata, rc):
# Called when the client is disconnected
print("Disconnected from MQTT broker!")
def message(client, topic, message):
# Called when a topic the client is subscribed to has a new message
print("New message on topic {0}: {1}".format(topic, message))
val = 0
try:
val = int(message) # attempt to parse it as a number
except ValueError:
pass
val = min(max(val, 0),len(leds)) # map it to number of dotstar LEDs
print("setting LED ",val)
leds[ val ] = 0xff00ff
# Setup the callback methods above
mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message
# Connect the client to the MQTT broker.
print("Connecting to MQTT broker...")
mqtt_client.connect()
last_msg_send_time = time.monotonic()
while True:
print("waiting")
mqtt_client.loop() # see if any messages to me
if time.monotonic() - last_msg_send_time > 3.0: # send a message every 3 secs
last_msg_send_time = time.monotonic()
leds.fill(0)
msg = "hi there! time is "+str(time.monotonic())
print("sending MQTT msg..", msg)
mqtt_client.publish( my_mqtt_topic_hello, msg )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment