Skip to content

Instantly share code, notes, and snippets.

@th0ma5w
Last active December 3, 2017 21:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save th0ma5w/df376f774279ab17ba553ad36863efe2 to your computer and use it in GitHub Desktop.
Save th0ma5w/df376f774279ab17ba553ad36863efe2 to your computer and use it in GitHub Desktop.
A Larsen Effect pattern using MQTT, Differencing LED state, and Protobufs
import paho.mqtt.client as mqtt
import time
bot_reply = None
def update_reply(msg):
global bot_reply
bot_reply=msg.decode('utf-8')
topics = {
b'xmas_lights_info': update_reply
}
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
for k in topics.keys():
client.subscribe(k.decode('utf-8'))
def on_message(client, userdata, msg):
if topics.has_key(msg.topic):
if topics[msg.topic] is not None:
topics[msg.topic](msg.payload)
c = mqtt.Client()
c.on_connect = on_connect
c.on_message = on_message
c.connect("show")
def send_and_wait(t,m):
global bot_reply
bot_reply = None
c.publish(t,m)
C=0
while bot_reply == None:
c.loop(timeout=1.0)
C+=1
if C > 1000:
bot_reply = false
def send_forget(t,m):
c.publish(t,m)
#send = send_and_wait
send = send_forget
from xmas_lights_pb2 import xmas_lights
from xmas_lights_pb2 import xmas_lights_sequence
from random import random
from math import floor
rr = lambda : int(random()*255)
pingpong = range(36)+list(reversed(range(35)))[:-1]
curr_leds=[0 for x in range(36)]
prev_leds=[0 for x in range(36)]
for zzz in range(1):
message = xmas_lights_sequence()
for zz in range(4):
for x in pingpong:
mm = message.sequences.add()
mm.delay=1
for xx in range(36):
if curr_leds[xx] > 0:
curr_leds[xx] -= 10
else:
curr_leds[xx] = 0
curr_leds[x] = 100
for xx in range(36):
if curr_leds[xx] != prev_leds[xx]:
RR=int(curr_leds[xx]*255.0/100.0)
l1 = mm.scene.leds.add()
l1.led, l1.red, l1.green, l1.blue, l1.intensity = xx,RR,0,0,RR
for xx in range(36):
prev_leds[xx] = curr_leds[xx]
for x in range(36):
if sum(curr_leds) > 0:
mm = message.sequences.add()
mm.delay = 1
for xx in range(36):
if curr_leds[xx] > 0:
curr_leds[xx] -= 10
else:
curr_leds[xx] = 0
for xx in range(36):
if curr_leds[xx] != prev_leds[xx]:
RR=int(curr_leds[xx]*255.0/100.0)
l1 = mm.scene.leds.add()
l1.led, l1.red, l1.green, l1.blue, l1.intensity = xx,RR,0,0,RR
for xx in range(36):
prev_leds[xx] = curr_leds[xx]
send_and_wait('xmas_lights_seq',bytearray(message.SerializeToString()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment