Skip to content

Instantly share code, notes, and snippets.

@cobryan05
Created March 9, 2022 23:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cobryan05/c161e03cbc052a593dcc568ea79d7eae to your computer and use it in GitHub Desktop.
Save cobryan05/c161e03cbc052a593dcc568ea79d7eae to your computer and use it in GitHub Desktop.
AppDaemon MQTT SmartBulb controller
import hassapi as hass
import asyncio
import colorsys
import json
"""
*BETA* - EXPECT ISSUES!
App to control a Smart Bulb using MQTT
Watches an MQTT topic to control a Smart Bulb with priority-based notifications.
If the MQTT topic has multiple notification entries, the bulb will always show
the highest priority one.
Examples:
topic: myhome/lights/test_light/notifications/TestRedLight
payload: { "rgb": [255, 0, 0], "priority": 100 }
topic: myhome/lights/test_light/notifications/TestBlueLight
payload: { "rgb": [0, 0, 255], "priority": 101 }
topic: myhome/lights/test_light/notifications/TestSequence
payload: {
"sequence": [
{ "rgb": [255,0,0], "duration": 1, "fade_in": 0.5 },
{ "rgb": [0,255,0], "duration": 2, "fade_in": 1 },
{ "rgb": [0,0,255], "duration": 1, "fade_in": 0 }
],
"loops": -1,
"priority": 102
}
Note: a negative 'loops' value will loop forever.
topic: myhome/lights/test_light/notifications/TestTemperature
payload: { "temperature": 2700, "brightness": 50, "priority": 103 }
topic: myhome/lights/test_light/notifications/TestHSV
payload: { "hue": 350, "saturation": 50, "value": 100, "priority": 104 }
If all of these topics exist, the bulb will display TestHSV. If the TestHSV topic
is removed (by setting an empty payload with 'retain' true) then TestTemperature will
be displayed. If TestTemperature is also removed, then the next highest priority,
TestSequence, will be displayed, and so on.
Note: There is an implicit "LightOff" with priority "0". If there are no notifications
to display, then the highest priority topic will be LightOff and the light will turn off.
#Example apps.yaml:
test_notification_light:
module: notification_light_beta
class: NotificationLight
bulb_entity: light.test_light
topic: myhome/lights/test_light/notifications/#
"""
# TODO: Sharing priority has issues
# TODO: Expiring notifications, one-off notifications, temporarily display lower-priority alerts
VERBOSE_LOGGING = False
DEFAULT_PRIORITY = 5
DEFAULT_DURATION = 2
DEFAULT_NAME = "Unnamed"
CANCEL_TIMEOUT = 3
SAME_PRIO_TRANS_TIME_MS = 2000
SAME_PRIO_WAIT_TIME_SEC = 2
NEW_STATE_DISPLAY_TIME_SEC = 1
DEFAULT_TRANSITION_TIME = 0
DEFAULT_FADE_IN_TIME = 0
DEFAULT_FADE_OUT_TIME = 0
KEY_PRIORITY = "priority"
KEY_DURATION = "duration"
KEY_LOOP_CNT = "loops"
KEY_FADE_IN = "fade_in"
KEY_FADE_OUT = "fade_out"
KEY_NAME = "name"
KEY_ON = "on"
DEFAULT_STATE_JSON = json.loads(
f'{{ "{KEY_NAME}": "LightOff", "{KEY_ON}": false, "{KEY_PRIORITY}": 0 }}'
)
def GetRgbJson(name, rgb, priority, **kwargs):
return {KEY_NAME: name, "rgb": rgb, KEY_PRIORITY: priority, **kwargs}
class NotificationLight(hass.Hass):
def initialize(self):
self._mqtt = None
self.topic = self.args['topic']
self.topic = self.topic.strip("/#")
self._mqtt = self.get_plugin_api("MQTT")
self._mqtt.listen_event(self.ad_mqtt_cb, "MQTT_MESSAGE")
# Handle lower-latency presets
presets = self.args.get('presets', [])
for preset in presets:
name = preset.get('name', "")
entity = preset.get('entity', None)
payload = json.loads(preset.get('payload', {}))
if entity:
self.listen_state(self.sensor_callback, entity,
immediate=False, name=name, payload=payload)
self.bulb = SmartBulb(hass=self, entity_id=self.args['bulb_entity'])
self.log(
f"NotificationLight {self.bulb} listening on topic on {self.topic}")
def terminate(self):
self.log("Terminating Smart Bulb")
if self._mqtt:
self._mqtt.mqtt_unsubscribe(topic=self.topic)
def sensor_callback(self, entity, attribute, old, new, kwargs):
try:
self.log(
f"Entity {entity} changed from {old} to {new} [{kwargs}]")
name = kwargs.get('name', "Unnamed")
payload = kwargs.get('payload', None)
if payload:
if KEY_NAME not in payload:
payload[KEY_NAME] = name
if new == "on":
self.create_task(self.bulb.PushSequence(payload=payload))
else:
self.create_task(self.bulb.RemoveSequence(name))
except Exception as e:
self.log(f"EXCEPTION: {str(e)}")
def ad_mqtt_cb(self, event_name, data, kwargs):
if event_name != "MQTT_MESSAGE":
self.log(
f"Unknown message: {event_name} [{data}]", level="WARNING")
return
# Can't seem to filter by topic, so we get *everything*. Filter here.
if not data['topic'].startswith(self.topic):
return
topic = data.get('topic', None)
name_from_topic = topic.split('/')[-1]
payload = data.get('payload', None)
if not payload:
payload = "{}"
payload = json.loads(payload)
if payload:
if KEY_NAME not in payload:
payload[KEY_NAME] = name_from_topic
self.create_task(self.bulb.PushSequence(payload=payload))
else:
self.create_task(self.bulb.RemoveSequence(name_from_topic))
name = name_from_topic
class SmartBulb:
def __init__(self, hass, entity_id):
self.hass = hass
self._entity_id = entity_id
self._sequence_prio_list = []
self._sequence_dict = {}
self._task = None
self.hass.create_task(self.__init_async__())
async def __init_async__(self):
self._stop_event = asyncio.Event()
await self.PushSequence(DEFAULT_STATE_JSON)
# self.hass.create_task(self.TestLights())
async def TestLights(self):
self.VerboseLog("TEST LIGHTS")
sequences = [
{KEY_NAME: "LightOn",
"temperature": 2700, "brightness": 50, KEY_FADE_IN: 1, KEY_PRIORITY: 10},
{KEY_NAME: "NightLight",
"rgb": (0, 0, 128), KEY_PRIORITY: 1, KEY_FADE_IN: 2, KEY_FADE_OUT: 5},
{KEY_NAME: "LightOn"},
{KEY_NAME: "LightOn",
"temperature": 2700, "brightness": 50, KEY_FADE_IN: 1, KEY_PRIORITY: 10},
{KEY_NAME: "InMeeting",
"rgb": [255, 0, 0], KEY_PRIORITY: 15},
{KEY_NAME: "LightOn"},
{KEY_NAME: "NightLight"},
{KEY_NAME: "SequenceTest",
"sequence": [
{"rgb": [255, 0, 0], "duration": 1},
{"rgb": [0, 255, 0], "duration": 1},
{"rgb": [0, 0, 255], "duration": 1},
],
"loops": 2
},
{KEY_NAME: "InMeeting"},
]
for item in sequences:
self.VerboseLog(f"+ {item}")
if KEY_NAME in item and len(item) == 1:
await self.RemoveSequence(item[KEY_NAME])
else:
await self.PushSequence(item)
self.VerboseLog(f"- {item}")
await self.hass.sleep(2)
def __str__(self):
return "SmartBulb ({self._entity_id})"
@property
def ActiveSequenceName(self):
return self._sequence_prio_list[0][0] if len(self._sequence_prio_list) > 0 else None
async def ColorTempCo(self, temperature, brightness, transition=0):
# self.hass.log(f"SET TEMP: {temperature} {brightness}")
self.hass.call_service('light/turn_on', entity_id=self._entity_id,
kelvin=temperature, brightness_pct=brightness, transition=transition)
async def RgbCo(self, red, green, blue, transition=0):
# self.hass.log(f"SET RGB: {red} {green} {blue}")
rgb = (red, green, blue)
is_float = len([x for x in rgb if isinstance(x, float)]) > 0
divisor = 255 if not is_float else 1
rgb_vals = [x/divisor for x in rgb]
h, s, v = colorsys.rgb_to_hsv(rgb_vals[0], rgb_vals[1], rgb_vals[2])
h = int(h * 360)
s = int(s * 100)
v = int(v * 100)
await self.HsvCo(h, s, v, transition=transition)
async def HsvCo(self, h, s, v, transition=0):
# self.hass.log(f"SET HSV: {h} {s} {v} Trans:{transition} Entity: {self._entity_id}")
self.hass.call_service('light/turn_on', entity_id=self._entity_id,
hs_color=[h, s], brightness_pct=v, transition=transition)
async def SetOnOff(self, on, transition=0):
# self.hass.log(f"SET ON/OFF: {on}")
if on:
self.hass.call_service('light/turn_on',
entity_id=self._entity_id, transition=transition)
else:
self.hass.call_service('light/turn_off',
entity_id=self._entity_id, transition=transition)
async def SequenceCo(self, payload_list, loopCnt, transition=0):
self.VerboseLog("+SEQUENCECO")
self.VerboseLog(f"Run sequence: {payload_list}")
curLoop = 0
prev_fade_out = transition
while curLoop != loopCnt:
for payload in payload_list:
duration = payload.get(KEY_DURATION, DEFAULT_DURATION)
# Determine fade time
next_fade_in = payload.get(KEY_FADE_IN, DEFAULT_FADE_IN_TIME)
next_transition_time = max(next_fade_in, prev_fade_out)
prev_fade_out = payload.get(
KEY_FADE_OUT, DEFAULT_FADE_OUT_TIME)
self.VerboseLog(
f"Running payload: {payload} [Fade {next_transition_time}]")
try:
sequence = await self.GetSequenceCo(payload, transition=next_transition_time)
await sequence
self.VerboseLog(f"SequenceCo Waiting for {duration}")
await self.hass.sleep(duration)
except asyncio.CancelledError:
self.VerboseLog("SequenceCo cancelled")
curLoop = loopCnt - 1
break
except Exception as e:
self.VerboseLog(f"SequenceCo exception: {str(e)}")
curLoop += 1
self.VerboseLog("Done running sequence")
self.VerboseLog("-SEQUENCECO")
async def PushSequence(self, payload):
self.VerboseLog(f"PushSequence: {payload}")
priority = payload.get(KEY_PRIORITY, DEFAULT_PRIORITY)
name = payload.get(KEY_NAME, DEFAULT_NAME)
fade_in = payload.get(KEY_FADE_IN, DEFAULT_FADE_IN_TIME)
fade_out = payload.get(KEY_FADE_OUT, DEFAULT_FADE_OUT_TIME)
new_sequence_info = {"payload": payload,
"priority": priority,
"name": name,
"fade_in": fade_in,
"fade_out": fade_out}
prev_active = self.ActiveSequenceName
if name in self._sequence_dict:
self.VerboseLog("Sequence already exists, updating")
self.RemoveNameFromLists(name)
prev_active = None
self._sequence_dict[name] = new_sequence_info
self._sequence_prio_list.append((name, priority))
self._sequence_prio_list.sort(key=lambda x: x[1], reverse=True)
new_active = self.ActiveSequenceName
# if active changed, or new shares priority, then update light
if new_active != prev_active or priority == self._sequence_dict[new_active]["priority"]:
await self.StartSequenceTask(transition=fade_in)
if VERBOSE_LOGGING:
self.PrintSequence()
def RemoveNameFromLists(self, name):
del self._sequence_dict[name]
self._sequence_prio_list = [
x for x in self._sequence_prio_list if x[0] != name]
async def RemoveSequence(self, name):
self.VerboseLog(f"RemoveSequence {name}")
if name not in self._sequence_dict:
self.hass.log(f"Can't remove state {name}, not in state list")
return
if name == self.ActiveSequenceName:
self.VerboseLog(f"Removing the active sequence! {name}")
active_sequence = self._sequence_dict[name]
transition = active_sequence.get(
KEY_FADE_OUT, DEFAULT_FADE_OUT_TIME)
else:
transition = 0
self.RemoveNameFromLists(name)
if VERBOSE_LOGGING:
self.PrintSequence()
await self.StartSequenceTask(transition=transition)
async def StartSequenceTask(self, transition=0):
self.VerboseLog("+STARTSEQUENCE")
self.VerboseLog("StartSequence called")
try:
if self._task:
self.VerboseLog("StartSequence - Cancelling old task")
self._stop_event.set()
self.VerboseLog(
f"StartSequence - Waiting for task to cancel: {self._task.cancelled()} {self._task.done()}")
await asyncio.wait_for(self._task, CANCEL_TIMEOUT)
self.VerboseLog(
f"StartSequence - Done waiting for cancel {self._task.result()}")
except asyncio.CancelledError as e:
self.VerboseLog("StartSequence - cancel exception")
except Exception as e:
self.VerboseLog(
f"StartSequence - exception: {str(e)} - {type(e)} {e.args}")
finally:
self.VerboseLog("StartSequence - Done waiting for cancel")
prio_list = self._sequence_prio_list
if len(prio_list) > 0:
# First check if there are multiple of the same priority
same_prio_list = [x for x in prio_list if x[1] == prio_list[0][1]]
self.VerboseLog(f"StartSequence - Same priority: {same_prio_list}")
def callback_inner(f):
self.VerboseLog("Done Callback")
self._stop_event.clear()
self._task = await self.hass.create_task(self.RunSequenceList(same_prio_list, transition=transition), callback=callback_inner)
else:
self.hass.log("No sequences to start!")
self._task = None
self.VerboseLog("-STARTSEQUENCE")
async def RunSequenceList(self, sequence_list, transition=0):
self.VerboseLog("+RUNSEQUENCELIST")
self.VerboseLog(f"RunSequenceList - List: {sequence_list}")
# Don't keep looping if there isn't a sequence
should_exit_loop = len(sequence_list) == 1
prev_fade_out = transition
while True:
for name, priority in sequence_list:
sequence = self._sequence_dict[name]
# Determine fade time
next_fade_in = sequence.get(KEY_FADE_IN, DEFAULT_FADE_IN_TIME)
next_transition_time = max(next_fade_in, prev_fade_out)
prev_fade_out = sequence.get(
KEY_FADE_OUT, DEFAULT_FADE_OUT_TIME)
self.VerboseLog(
f"RunSequenceList - {sequence} [Trans: {next_transition_time}]")
coroutine = await self.GetSequenceCo(sequence["payload"], transition=next_transition_time)
coroutine = asyncio.create_task(coroutine)
stop_wait = asyncio.create_task(self._stop_event.wait())
try:
wait_events = [coroutine, stop_wait]
done, pending = await asyncio.wait(wait_events, return_when=asyncio.FIRST_COMPLETED)
except Exception as e:
self.hass.log(
f"RunSequenceList- asyncioWait excecption: {str(e)}")
if stop_wait in done:
self.hass.log(
"RunSequenceList - Stop Event detected while waiting for coroutine")
coroutine.cancel()
should_exit_loop |= True
self.VerboseLog(
f"RunSequenceList - Done: [{done}] Pending: [{pending}] List: [{wait_events}]")
# await coroutine
# Check for the cancel event while we wait to show the next color
try:
if not should_exit_loop:
self.VerboseLog(
f"RunSequenceList - About to wait, stopevent: {self._stop_event.is_set()}")
await asyncio.wait_for(self._stop_event.wait(), timeout=SAME_PRIO_WAIT_TIME_SEC)
self.VerboseLog(
f"RunSequenceList - Done waiting, stopevent: {self._stop_event.is_set()}")
except asyncio.TimeoutError:
pass # No stop event, so continue to next loop
else:
# Received the stop event or otherwise shouldn't loop anymore
should_exit_loop |= True
break
if should_exit_loop:
break
self.VerboseLog("-RUNSEQUENCELIST")
def PrintSequence(self):
for sequence in reversed(self._sequence_prio_list):
self.hass.log(sequence)
self.hass.log("Sequences:")
def VerboseLog(self, msg):
if VERBOSE_LOGGING:
self.hass.log(msg)
async def GetSequenceCo(self, payload, transition=0):
if 'sequence' in payload:
loopCnt = payload.get(KEY_LOOP_CNT, 1)
sequence = self.SequenceCo(
payload['sequence'], loopCnt=loopCnt, transition=transition)
elif 'rgb' in payload:
r, g, b = payload['rgb']
sequence = self.RgbCo(r, g, b, transition=transition)
elif 'hue' in payload:
h, s, v = payload['hue'], payload['saturation'], payload['value']
sequence = self.HsvCo(h, s, v, transition=transition)
elif 'temperature' in payload:
temperature, brightness = payload['temperature'], payload['brightness']
sequence = self.ColorTempCo(
temperature, brightness, transition=transition)
elif KEY_ON in payload:
on = bool(payload[KEY_ON])
sequence = self.SetOnOff(on=on, transition=transition)
else:
raise(f"Unable to determine light class for data in {payload}")
return sequence
def PrintStates(self):
self.hass.log("States:")
for name, state in self._states.items():
self.hass.log(f" - {name}: {state}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment