Last active
December 1, 2016 18:35
-
-
Save balloob/ff6d3d57800fafe00c43 to your computer and use it in GitHub Desktop.
Home Assistant light/mqtt.py which turns on based on brightness being set.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Support for MQTT lights. | |
For more details about this platform, please refer to the documentation at | |
https://home-assistant.io/components/light.mqtt/ | |
""" | |
import logging | |
from functools import partial | |
import homeassistant.components.mqtt as mqtt | |
from homeassistant.components.light import ( | |
ATTR_BRIGHTNESS, ATTR_RGB_COLOR, Light) | |
from homeassistant.helpers.template import render_with_possible_json_value | |
_LOGGER = logging.getLogger(__name__) | |
DEFAULT_NAME = 'MQTT Light' | |
DEFAULT_QOS = 0 | |
DEFAULT_PAYLOAD_ON = 'ON' | |
DEFAULT_PAYLOAD_OFF = 'OFF' | |
DEFAULT_OPTIMISTIC = False | |
DEFAULT_BRIGHTNESS_SCALE = 255 | |
DEPENDENCIES = ['mqtt'] | |
def setup_platform(hass, config, add_devices_callback, discovery_info=None): | |
"""Add MQTT Light.""" | |
if (config.get('command_topic') is None and | |
config.get('brightness_command_topic') is None): | |
_LOGGER.error("Missing required variable: command_topic or " | |
"brightness_command_topic") | |
return False | |
add_devices_callback([MqttLight( | |
hass, | |
config.get('name', DEFAULT_NAME), | |
{key: config.get(key) for key in | |
(typ + topic | |
for typ in ('', 'brightness_', 'rgb_') | |
for topic in ('state_topic', 'command_topic'))}, | |
{key: config.get(key + '_value_template') | |
for key in ('state', 'brightness', 'rgb')}, | |
config.get('qos', DEFAULT_QOS), | |
{ | |
'on': config.get('payload_on', DEFAULT_PAYLOAD_ON), | |
'off': config.get('payload_off', DEFAULT_PAYLOAD_OFF) | |
}, | |
config.get('optimistic', DEFAULT_OPTIMISTIC), | |
config.get('brightness_scale', DEFAULT_BRIGHTNESS_SCALE) | |
)]) | |
class MqttLight(Light): | |
"""MQTT light.""" | |
# pylint: disable=too-many-arguments,too-many-instance-attributes | |
def __init__(self, hass, name, topic, templates, qos, payload, optimistic, | |
brightness_scale): | |
"""Initialize MQTT light.""" | |
self._hass = hass | |
self._name = name | |
self._topic = topic | |
self._qos = qos | |
self._payload = payload | |
self._optimistic = optimistic or topic["state_topic"] is None | |
self._optimistic_rgb = optimistic or topic["rgb_state_topic"] is None | |
self._optimistic_brightness = (optimistic or | |
topic["brightness_state_topic"] is None) | |
self._brightness_scale = brightness_scale | |
self._state = False | |
templates = {key: ((lambda value: value) if tpl is None else | |
partial(render_with_possible_json_value, hass, tpl)) | |
for key, tpl in templates.items()} | |
def state_received(topic, payload, qos): | |
"""A new MQTT message has been received.""" | |
payload = templates['state'](payload) | |
if payload == self._payload["on"]: | |
self._state = True | |
elif payload == self._payload["off"]: | |
self._state = False | |
self.update_ha_state() | |
if self._topic["state_topic"] is not None: | |
mqtt.subscribe(self._hass, self._topic["state_topic"], | |
state_received, self._qos) | |
def brightness_received(topic, payload, qos): | |
"""A new MQTT message for the brightness has been received.""" | |
device_value = float(templates['brightness'](payload)) | |
percent_bright = device_value / self._brightness_scale | |
brightness = int(percent_bright * 255) | |
if brightness > 0: | |
self._brightness = brightness | |
self._state = True | |
else: | |
self._state = False | |
self.update_ha_state() | |
if self._topic["brightness_state_topic"] is not None: | |
mqtt.subscribe(self._hass, self._topic["brightness_state_topic"], | |
brightness_received, self._qos) | |
self._brightness = 255 | |
elif self._topic["brightness_command_topic"] is not None: | |
self._brightness = 255 | |
else: | |
self._brightness = None | |
def rgb_received(topic, payload, qos): | |
"""A new MQTT message has been received.""" | |
self._rgb = [int(val) for val in | |
templates['rgb'](payload).split(',')] | |
self.update_ha_state() | |
if self._topic["rgb_state_topic"] is not None: | |
mqtt.subscribe(self._hass, self._topic["rgb_state_topic"], | |
rgb_received, self._qos) | |
self._rgb = [255, 255, 255] | |
if self._topic["rgb_command_topic"] is not None: | |
self._rgb = [255, 255, 255] | |
else: | |
self._rgb = None | |
@property | |
def brightness(self): | |
"""Brightness of this light between 0..255.""" | |
return self._brightness | |
@property | |
def rgb_color(self): | |
"""RGB color value.""" | |
return self._rgb | |
@property | |
def should_poll(self): | |
"""No polling needed for a MQTT light.""" | |
return False | |
@property | |
def name(self): | |
"""Name of the device if any.""" | |
return self._name | |
@property | |
def is_on(self): | |
"""True if device is on.""" | |
return self._state | |
@property | |
def assumed_state(self): | |
"""Return True if we do optimistic updates.""" | |
return self._optimistic | |
def turn_on(self, **kwargs): | |
"""Turn the device on.""" | |
should_update = False | |
if ATTR_RGB_COLOR in kwargs and \ | |
self._topic["rgb_command_topic"] is not None: | |
mqtt.publish(self._hass, self._topic["rgb_command_topic"], | |
"{},{},{}".format(*kwargs[ATTR_RGB_COLOR]), self._qos) | |
if self._optimistic_rgb: | |
self._rgb = kwargs[ATTR_RGB_COLOR] | |
should_update = True | |
if ATTR_BRIGHTNESS in kwargs and \ | |
self._topic["brightness_command_topic"] is not None: | |
percent_bright = float(kwargs[ATTR_BRIGHTNESS]) / 255 | |
device_brightness = int(percent_bright * self._brightness_scale) | |
mqtt.publish(self._hass, self._topic["brightness_command_topic"], | |
device_brightness, self._qos) | |
if self._optimistic_brightness: | |
self._brightness = kwargs[ATTR_BRIGHTNESS] | |
should_update = True | |
if self._topic["command_topic"] is not None: | |
mqtt.publish(self._hass, self._topic["command_topic"], | |
self._payload["on"], self._qos) | |
if self._optimistic: | |
# Optimistically assume that switch has changed state. | |
self._state = True | |
should_update = True | |
if should_update: | |
self.update_ha_state() | |
def turn_off(self, **kwargs): | |
"""Turn the device off.""" | |
mqtt.publish(self._hass, self._topic["command_topic"], | |
self._payload["off"], self._qos) | |
if self._optimistic: | |
# Optimistically assume that switch has changed state. | |
self._state = False | |
self.update_ha_state() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_turning_light_on_off_based_on_brightness(self): | |
self.assertTrue(light.setup(self.hass, { | |
'light': { | |
'platform': 'mqtt', | |
'name': 'test', | |
'brightness_command_topic': 'test_light_rgb/brightness/set', | |
'brightness_state_topic': 'test_light_rgb/brightness/status', | |
} | |
})) | |
state = self.hass.states.get('light.test') | |
self.assertEqual(STATE_OFF, state.state) | |
self.assertIsNone(state.attributes.get('brightness')) | |
fire_mqtt_message(self.hass, 'test_light_rgb/brightness/status', '120') | |
self.hass.pool.block_till_done() | |
state = self.hass.states.get('light.test') | |
self.assertEqual(STATE_ON, state.state) | |
self.assertEqual(120, state.attributes.get('brightness')) | |
fire_mqtt_message(self.hass, 'test_light_rgb/brightness/status', '0') | |
self.hass.pool.block_till_done() | |
state = self.hass.states.get('light.test') | |
self.assertEqual(STATE_OFF, state.state) | |
self.assertIsNone(state.attributes.get('brightness')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment