Last active
December 5, 2024 00:21
-
-
Save scruss/7569e1c10b8c5c43b0a1160ca93dedea to your computer and use it in GitHub Desktop.
a very simple Cheerlights/NeoPixel MQTT client for MicroPython
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# scruss, 2024-12 | |
# MicroPython | |
""" | |
a very simple Cheerlights MQTT client for MicroPython | |
Uses Peter Hinch's MicroPython Asynchronous MQTT library | |
— https://github.com/peterhinch/micropython-mqtt/ | |
Install it with: | |
mpremote mip install github:peterhinch/micropython-mqtt | |
Edit local and neopixel configuration sections below | |
""" | |
from mqtt_as import MQTTClient, config | |
import asyncio | |
import machine | |
from neopixel import NeoPixel | |
# from apa106 import APA106 # if you are using (G, R, B) LEDs | |
# Local configuration | |
config["ssid"] = "Your_wifi_SSID" | |
config["wifi_pw"] = "Your_wifi_password" | |
MQTTClient.DEBUG = False # print diagnostic messages if true | |
# begin board specific neopixel configuration | |
""" | |
My board - Wemos S3 MINI PRO - has one addressable LED | |
on Pin 8, but unusually has a Enable/Power signal on Pin 7 | |
Check your board details. | |
Some boards make RGB LEDs too bright and wash out the colours, | |
so maybe set 'bri' to an integer between 2-4 | |
""" | |
_ = machine.Pin(7, machine.Pin.OUT, value=1) # neopixel power | |
neopin = Pin(8, Pin.OUT) # control pin | |
np = NeoPixel(neopin, 1) # we have one whole LED here | |
bri = 1 # LED brightness divisor (integer) | |
# end board specific neopixel setup | |
# *** no need to edit anything below here *** | |
np.fill( | |
(24 // bri, 48 // bri, 96 // bri) | |
) # light pixels to show they work | |
np.write() | |
config["server"] = "mqtt.cheerlights.com" | |
def callback(topic, msg, retained, properties=None): | |
print("Colour:", msg.decode()) | |
m = msg.decode() # is a hex RGB string like '#ff00ff' | |
np.fill( | |
( | |
int(m[1:3], 16) // bri, | |
int(m[3:5], 16) // bri, | |
int(m[5:7], 16) // bri, | |
) | |
) | |
np.write() | |
async def conn_han(client): | |
await client.subscribe("hex", 0) | |
async def main(client): | |
await client.connect() | |
while True: | |
# hang around forever waiting for messages to come in | |
await asyncio.sleep(5) | |
config["subs_cb"] = callback | |
config["connect_coro"] = conn_han | |
client = MQTTClient(config) | |
try: | |
asyncio.run(main(client)) | |
finally: | |
client.close() # Prevent LmacRxBlk:1 errors | |
np.fill((0, 0, 0)) | |
np.write() # turn off neopixels for tidiness |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment