Created
March 9, 2022 23:26
-
-
Save cobryan05/c161e03cbc052a593dcc568ea79d7eae to your computer and use it in GitHub Desktop.
AppDaemon MQTT SmartBulb controller
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hassapi as hass | |
import asyncio | |
import colorsys | |
import json | |
""" | |
*BETA* - EXPECT ISSUES! | |
App to control a Smart Bulb using MQTT | |
Watches an MQTT topic to control a Smart Bulb with priority-based notifications. | |
If the MQTT topic has multiple notification entries, the bulb will always show | |
the highest priority one. | |
Examples: | |
topic: myhome/lights/test_light/notifications/TestRedLight | |
payload: { "rgb": [255, 0, 0], "priority": 100 } | |
topic: myhome/lights/test_light/notifications/TestBlueLight | |
payload: { "rgb": [0, 0, 255], "priority": 101 } | |
topic: myhome/lights/test_light/notifications/TestSequence | |
payload: { | |
"sequence": [ | |
{ "rgb": [255,0,0], "duration": 1, "fade_in": 0.5 }, | |
{ "rgb": [0,255,0], "duration": 2, "fade_in": 1 }, | |
{ "rgb": [0,0,255], "duration": 1, "fade_in": 0 } | |
], | |
"loops": -1, | |
"priority": 102 | |
} | |
Note: a negative 'loops' value will loop forever. | |
topic: myhome/lights/test_light/notifications/TestTemperature | |
payload: { "temperature": 2700, "brightness": 50, "priority": 103 } | |
topic: myhome/lights/test_light/notifications/TestHSV | |
payload: { "hue": 350, "saturation": 50, "value": 100, "priority": 104 } | |
If all of these topics exist, the bulb will display TestHSV. If the TestHSV topic | |
is removed (by setting an empty payload with 'retain' true) then TestTemperature will | |
be displayed. If TestTemperature is also removed, then the next highest priority, | |
TestSequence, will be displayed, and so on. | |
Note: There is an implicit "LightOff" with priority "0". If there are no notifications | |
to display, then the highest priority topic will be LightOff and the light will turn off. | |
#Example apps.yaml: | |
test_notification_light: | |
module: notification_light_beta | |
class: NotificationLight | |
bulb_entity: light.test_light | |
topic: myhome/lights/test_light/notifications/# | |
""" | |
# TODO: Sharing priority has issues | |
# TODO: Expiring notifications, one-off notifications, temporarily display lower-priority alerts | |
VERBOSE_LOGGING = False | |
DEFAULT_PRIORITY = 5 | |
DEFAULT_DURATION = 2 | |
DEFAULT_NAME = "Unnamed" | |
CANCEL_TIMEOUT = 3 | |
SAME_PRIO_TRANS_TIME_MS = 2000 | |
SAME_PRIO_WAIT_TIME_SEC = 2 | |
NEW_STATE_DISPLAY_TIME_SEC = 1 | |
DEFAULT_TRANSITION_TIME = 0 | |
DEFAULT_FADE_IN_TIME = 0 | |
DEFAULT_FADE_OUT_TIME = 0 | |
KEY_PRIORITY = "priority" | |
KEY_DURATION = "duration" | |
KEY_LOOP_CNT = "loops" | |
KEY_FADE_IN = "fade_in" | |
KEY_FADE_OUT = "fade_out" | |
KEY_NAME = "name" | |
KEY_ON = "on" | |
DEFAULT_STATE_JSON = json.loads( | |
f'{{ "{KEY_NAME}": "LightOff", "{KEY_ON}": false, "{KEY_PRIORITY}": 0 }}' | |
) | |
def GetRgbJson(name, rgb, priority, **kwargs): | |
return {KEY_NAME: name, "rgb": rgb, KEY_PRIORITY: priority, **kwargs} | |
class NotificationLight(hass.Hass): | |
def initialize(self): | |
self._mqtt = None | |
self.topic = self.args['topic'] | |
self.topic = self.topic.strip("/#") | |
self._mqtt = self.get_plugin_api("MQTT") | |
self._mqtt.listen_event(self.ad_mqtt_cb, "MQTT_MESSAGE") | |
# Handle lower-latency presets | |
presets = self.args.get('presets', []) | |
for preset in presets: | |
name = preset.get('name', "") | |
entity = preset.get('entity', None) | |
payload = json.loads(preset.get('payload', {})) | |
if entity: | |
self.listen_state(self.sensor_callback, entity, | |
immediate=False, name=name, payload=payload) | |
self.bulb = SmartBulb(hass=self, entity_id=self.args['bulb_entity']) | |
self.log( | |
f"NotificationLight {self.bulb} listening on topic on {self.topic}") | |
def terminate(self): | |
self.log("Terminating Smart Bulb") | |
if self._mqtt: | |
self._mqtt.mqtt_unsubscribe(topic=self.topic) | |
def sensor_callback(self, entity, attribute, old, new, kwargs): | |
try: | |
self.log( | |
f"Entity {entity} changed from {old} to {new} [{kwargs}]") | |
name = kwargs.get('name', "Unnamed") | |
payload = kwargs.get('payload', None) | |
if payload: | |
if KEY_NAME not in payload: | |
payload[KEY_NAME] = name | |
if new == "on": | |
self.create_task(self.bulb.PushSequence(payload=payload)) | |
else: | |
self.create_task(self.bulb.RemoveSequence(name)) | |
except Exception as e: | |
self.log(f"EXCEPTION: {str(e)}") | |
def ad_mqtt_cb(self, event_name, data, kwargs): | |
if event_name != "MQTT_MESSAGE": | |
self.log( | |
f"Unknown message: {event_name} [{data}]", level="WARNING") | |
return | |
# Can't seem to filter by topic, so we get *everything*. Filter here. | |
if not data['topic'].startswith(self.topic): | |
return | |
topic = data.get('topic', None) | |
name_from_topic = topic.split('/')[-1] | |
payload = data.get('payload', None) | |
if not payload: | |
payload = "{}" | |
payload = json.loads(payload) | |
if payload: | |
if KEY_NAME not in payload: | |
payload[KEY_NAME] = name_from_topic | |
self.create_task(self.bulb.PushSequence(payload=payload)) | |
else: | |
self.create_task(self.bulb.RemoveSequence(name_from_topic)) | |
name = name_from_topic | |
class SmartBulb: | |
def __init__(self, hass, entity_id): | |
self.hass = hass | |
self._entity_id = entity_id | |
self._sequence_prio_list = [] | |
self._sequence_dict = {} | |
self._task = None | |
self.hass.create_task(self.__init_async__()) | |
async def __init_async__(self): | |
self._stop_event = asyncio.Event() | |
await self.PushSequence(DEFAULT_STATE_JSON) | |
# self.hass.create_task(self.TestLights()) | |
async def TestLights(self): | |
self.VerboseLog("TEST LIGHTS") | |
sequences = [ | |
{KEY_NAME: "LightOn", | |
"temperature": 2700, "brightness": 50, KEY_FADE_IN: 1, KEY_PRIORITY: 10}, | |
{KEY_NAME: "NightLight", | |
"rgb": (0, 0, 128), KEY_PRIORITY: 1, KEY_FADE_IN: 2, KEY_FADE_OUT: 5}, | |
{KEY_NAME: "LightOn"}, | |
{KEY_NAME: "LightOn", | |
"temperature": 2700, "brightness": 50, KEY_FADE_IN: 1, KEY_PRIORITY: 10}, | |
{KEY_NAME: "InMeeting", | |
"rgb": [255, 0, 0], KEY_PRIORITY: 15}, | |
{KEY_NAME: "LightOn"}, | |
{KEY_NAME: "NightLight"}, | |
{KEY_NAME: "SequenceTest", | |
"sequence": [ | |
{"rgb": [255, 0, 0], "duration": 1}, | |
{"rgb": [0, 255, 0], "duration": 1}, | |
{"rgb": [0, 0, 255], "duration": 1}, | |
], | |
"loops": 2 | |
}, | |
{KEY_NAME: "InMeeting"}, | |
] | |
for item in sequences: | |
self.VerboseLog(f"+ {item}") | |
if KEY_NAME in item and len(item) == 1: | |
await self.RemoveSequence(item[KEY_NAME]) | |
else: | |
await self.PushSequence(item) | |
self.VerboseLog(f"- {item}") | |
await self.hass.sleep(2) | |
def __str__(self): | |
return "SmartBulb ({self._entity_id})" | |
@property | |
def ActiveSequenceName(self): | |
return self._sequence_prio_list[0][0] if len(self._sequence_prio_list) > 0 else None | |
async def ColorTempCo(self, temperature, brightness, transition=0): | |
# self.hass.log(f"SET TEMP: {temperature} {brightness}") | |
self.hass.call_service('light/turn_on', entity_id=self._entity_id, | |
kelvin=temperature, brightness_pct=brightness, transition=transition) | |
async def RgbCo(self, red, green, blue, transition=0): | |
# self.hass.log(f"SET RGB: {red} {green} {blue}") | |
rgb = (red, green, blue) | |
is_float = len([x for x in rgb if isinstance(x, float)]) > 0 | |
divisor = 255 if not is_float else 1 | |
rgb_vals = [x/divisor for x in rgb] | |
h, s, v = colorsys.rgb_to_hsv(rgb_vals[0], rgb_vals[1], rgb_vals[2]) | |
h = int(h * 360) | |
s = int(s * 100) | |
v = int(v * 100) | |
await self.HsvCo(h, s, v, transition=transition) | |
async def HsvCo(self, h, s, v, transition=0): | |
# self.hass.log(f"SET HSV: {h} {s} {v} Trans:{transition} Entity: {self._entity_id}") | |
self.hass.call_service('light/turn_on', entity_id=self._entity_id, | |
hs_color=[h, s], brightness_pct=v, transition=transition) | |
async def SetOnOff(self, on, transition=0): | |
# self.hass.log(f"SET ON/OFF: {on}") | |
if on: | |
self.hass.call_service('light/turn_on', | |
entity_id=self._entity_id, transition=transition) | |
else: | |
self.hass.call_service('light/turn_off', | |
entity_id=self._entity_id, transition=transition) | |
async def SequenceCo(self, payload_list, loopCnt, transition=0): | |
self.VerboseLog("+SEQUENCECO") | |
self.VerboseLog(f"Run sequence: {payload_list}") | |
curLoop = 0 | |
prev_fade_out = transition | |
while curLoop != loopCnt: | |
for payload in payload_list: | |
duration = payload.get(KEY_DURATION, DEFAULT_DURATION) | |
# Determine fade time | |
next_fade_in = payload.get(KEY_FADE_IN, DEFAULT_FADE_IN_TIME) | |
next_transition_time = max(next_fade_in, prev_fade_out) | |
prev_fade_out = payload.get( | |
KEY_FADE_OUT, DEFAULT_FADE_OUT_TIME) | |
self.VerboseLog( | |
f"Running payload: {payload} [Fade {next_transition_time}]") | |
try: | |
sequence = await self.GetSequenceCo(payload, transition=next_transition_time) | |
await sequence | |
self.VerboseLog(f"SequenceCo Waiting for {duration}") | |
await self.hass.sleep(duration) | |
except asyncio.CancelledError: | |
self.VerboseLog("SequenceCo cancelled") | |
curLoop = loopCnt - 1 | |
break | |
except Exception as e: | |
self.VerboseLog(f"SequenceCo exception: {str(e)}") | |
curLoop += 1 | |
self.VerboseLog("Done running sequence") | |
self.VerboseLog("-SEQUENCECO") | |
async def PushSequence(self, payload): | |
self.VerboseLog(f"PushSequence: {payload}") | |
priority = payload.get(KEY_PRIORITY, DEFAULT_PRIORITY) | |
name = payload.get(KEY_NAME, DEFAULT_NAME) | |
fade_in = payload.get(KEY_FADE_IN, DEFAULT_FADE_IN_TIME) | |
fade_out = payload.get(KEY_FADE_OUT, DEFAULT_FADE_OUT_TIME) | |
new_sequence_info = {"payload": payload, | |
"priority": priority, | |
"name": name, | |
"fade_in": fade_in, | |
"fade_out": fade_out} | |
prev_active = self.ActiveSequenceName | |
if name in self._sequence_dict: | |
self.VerboseLog("Sequence already exists, updating") | |
self.RemoveNameFromLists(name) | |
prev_active = None | |
self._sequence_dict[name] = new_sequence_info | |
self._sequence_prio_list.append((name, priority)) | |
self._sequence_prio_list.sort(key=lambda x: x[1], reverse=True) | |
new_active = self.ActiveSequenceName | |
# if active changed, or new shares priority, then update light | |
if new_active != prev_active or priority == self._sequence_dict[new_active]["priority"]: | |
await self.StartSequenceTask(transition=fade_in) | |
if VERBOSE_LOGGING: | |
self.PrintSequence() | |
def RemoveNameFromLists(self, name): | |
del self._sequence_dict[name] | |
self._sequence_prio_list = [ | |
x for x in self._sequence_prio_list if x[0] != name] | |
async def RemoveSequence(self, name): | |
self.VerboseLog(f"RemoveSequence {name}") | |
if name not in self._sequence_dict: | |
self.hass.log(f"Can't remove state {name}, not in state list") | |
return | |
if name == self.ActiveSequenceName: | |
self.VerboseLog(f"Removing the active sequence! {name}") | |
active_sequence = self._sequence_dict[name] | |
transition = active_sequence.get( | |
KEY_FADE_OUT, DEFAULT_FADE_OUT_TIME) | |
else: | |
transition = 0 | |
self.RemoveNameFromLists(name) | |
if VERBOSE_LOGGING: | |
self.PrintSequence() | |
await self.StartSequenceTask(transition=transition) | |
async def StartSequenceTask(self, transition=0): | |
self.VerboseLog("+STARTSEQUENCE") | |
self.VerboseLog("StartSequence called") | |
try: | |
if self._task: | |
self.VerboseLog("StartSequence - Cancelling old task") | |
self._stop_event.set() | |
self.VerboseLog( | |
f"StartSequence - Waiting for task to cancel: {self._task.cancelled()} {self._task.done()}") | |
await asyncio.wait_for(self._task, CANCEL_TIMEOUT) | |
self.VerboseLog( | |
f"StartSequence - Done waiting for cancel {self._task.result()}") | |
except asyncio.CancelledError as e: | |
self.VerboseLog("StartSequence - cancel exception") | |
except Exception as e: | |
self.VerboseLog( | |
f"StartSequence - exception: {str(e)} - {type(e)} {e.args}") | |
finally: | |
self.VerboseLog("StartSequence - Done waiting for cancel") | |
prio_list = self._sequence_prio_list | |
if len(prio_list) > 0: | |
# First check if there are multiple of the same priority | |
same_prio_list = [x for x in prio_list if x[1] == prio_list[0][1]] | |
self.VerboseLog(f"StartSequence - Same priority: {same_prio_list}") | |
def callback_inner(f): | |
self.VerboseLog("Done Callback") | |
self._stop_event.clear() | |
self._task = await self.hass.create_task(self.RunSequenceList(same_prio_list, transition=transition), callback=callback_inner) | |
else: | |
self.hass.log("No sequences to start!") | |
self._task = None | |
self.VerboseLog("-STARTSEQUENCE") | |
async def RunSequenceList(self, sequence_list, transition=0): | |
self.VerboseLog("+RUNSEQUENCELIST") | |
self.VerboseLog(f"RunSequenceList - List: {sequence_list}") | |
# Don't keep looping if there isn't a sequence | |
should_exit_loop = len(sequence_list) == 1 | |
prev_fade_out = transition | |
while True: | |
for name, priority in sequence_list: | |
sequence = self._sequence_dict[name] | |
# Determine fade time | |
next_fade_in = sequence.get(KEY_FADE_IN, DEFAULT_FADE_IN_TIME) | |
next_transition_time = max(next_fade_in, prev_fade_out) | |
prev_fade_out = sequence.get( | |
KEY_FADE_OUT, DEFAULT_FADE_OUT_TIME) | |
self.VerboseLog( | |
f"RunSequenceList - {sequence} [Trans: {next_transition_time}]") | |
coroutine = await self.GetSequenceCo(sequence["payload"], transition=next_transition_time) | |
coroutine = asyncio.create_task(coroutine) | |
stop_wait = asyncio.create_task(self._stop_event.wait()) | |
try: | |
wait_events = [coroutine, stop_wait] | |
done, pending = await asyncio.wait(wait_events, return_when=asyncio.FIRST_COMPLETED) | |
except Exception as e: | |
self.hass.log( | |
f"RunSequenceList- asyncioWait excecption: {str(e)}") | |
if stop_wait in done: | |
self.hass.log( | |
"RunSequenceList - Stop Event detected while waiting for coroutine") | |
coroutine.cancel() | |
should_exit_loop |= True | |
self.VerboseLog( | |
f"RunSequenceList - Done: [{done}] Pending: [{pending}] List: [{wait_events}]") | |
# await coroutine | |
# Check for the cancel event while we wait to show the next color | |
try: | |
if not should_exit_loop: | |
self.VerboseLog( | |
f"RunSequenceList - About to wait, stopevent: {self._stop_event.is_set()}") | |
await asyncio.wait_for(self._stop_event.wait(), timeout=SAME_PRIO_WAIT_TIME_SEC) | |
self.VerboseLog( | |
f"RunSequenceList - Done waiting, stopevent: {self._stop_event.is_set()}") | |
except asyncio.TimeoutError: | |
pass # No stop event, so continue to next loop | |
else: | |
# Received the stop event or otherwise shouldn't loop anymore | |
should_exit_loop |= True | |
break | |
if should_exit_loop: | |
break | |
self.VerboseLog("-RUNSEQUENCELIST") | |
def PrintSequence(self): | |
for sequence in reversed(self._sequence_prio_list): | |
self.hass.log(sequence) | |
self.hass.log("Sequences:") | |
def VerboseLog(self, msg): | |
if VERBOSE_LOGGING: | |
self.hass.log(msg) | |
async def GetSequenceCo(self, payload, transition=0): | |
if 'sequence' in payload: | |
loopCnt = payload.get(KEY_LOOP_CNT, 1) | |
sequence = self.SequenceCo( | |
payload['sequence'], loopCnt=loopCnt, transition=transition) | |
elif 'rgb' in payload: | |
r, g, b = payload['rgb'] | |
sequence = self.RgbCo(r, g, b, transition=transition) | |
elif 'hue' in payload: | |
h, s, v = payload['hue'], payload['saturation'], payload['value'] | |
sequence = self.HsvCo(h, s, v, transition=transition) | |
elif 'temperature' in payload: | |
temperature, brightness = payload['temperature'], payload['brightness'] | |
sequence = self.ColorTempCo( | |
temperature, brightness, transition=transition) | |
elif KEY_ON in payload: | |
on = bool(payload[KEY_ON]) | |
sequence = self.SetOnOff(on=on, transition=transition) | |
else: | |
raise(f"Unable to determine light class for data in {payload}") | |
return sequence | |
def PrintStates(self): | |
self.hass.log("States:") | |
for name, state in self._states.items(): | |
self.hass.log(f" - {name}: {state}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment