Skip to content

Instantly share code, notes, and snippets.

@scruss
Last active December 5, 2024 00:21
Show Gist options
  • Save scruss/7569e1c10b8c5c43b0a1160ca93dedea to your computer and use it in GitHub Desktop.
Save scruss/7569e1c10b8c5c43b0a1160ca93dedea to your computer and use it in GitHub Desktop.
a very simple Cheerlights/NeoPixel MQTT client for MicroPython
# -*- coding: utf-8 -*-
# scruss, 2024-12
# MicroPython
"""
a very simple Cheerlights MQTT client for MicroPython
Uses Peter Hinch's MicroPython Asynchronous MQTT library
— https://github.com/peterhinch/micropython-mqtt/
Install it with:
mpremote mip install github:peterhinch/micropython-mqtt
Edit local and neopixel configuration sections below
"""
from mqtt_as import MQTTClient, config
import asyncio
import machine
from neopixel import NeoPixel
# from apa106 import APA106 # if you are using (G, R, B) LEDs
# Local configuration
config["ssid"] = "Your_wifi_SSID"
config["wifi_pw"] = "Your_wifi_password"
MQTTClient.DEBUG = False # print diagnostic messages if true
# begin board specific neopixel configuration
"""
My board - Wemos S3 MINI PRO - has one addressable LED
on Pin 8, but unusually has a Enable/Power signal on Pin 7
Check your board details.
Some boards make RGB LEDs too bright and wash out the colours,
so maybe set 'bri' to an integer between 2-4
"""
_ = machine.Pin(7, machine.Pin.OUT, value=1) # neopixel power
neopin = Pin(8, Pin.OUT) # control pin
np = NeoPixel(neopin, 1) # we have one whole LED here
bri = 1 # LED brightness divisor (integer)
# end board specific neopixel setup
# *** no need to edit anything below here ***
np.fill(
(24 // bri, 48 // bri, 96 // bri)
) # light pixels to show they work
np.write()
config["server"] = "mqtt.cheerlights.com"
def callback(topic, msg, retained, properties=None):
print("Colour:", msg.decode())
m = msg.decode() # is a hex RGB string like '#ff00ff'
np.fill(
(
int(m[1:3], 16) // bri,
int(m[3:5], 16) // bri,
int(m[5:7], 16) // bri,
)
)
np.write()
async def conn_han(client):
await client.subscribe("hex", 0)
async def main(client):
await client.connect()
while True:
# hang around forever waiting for messages to come in
await asyncio.sleep(5)
config["subs_cb"] = callback
config["connect_coro"] = conn_han
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
np.fill((0, 0, 0))
np.write() # turn off neopixels for tidiness
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment