Last active
November 9, 2020 17:20
-
-
Save fredrike/26830a15b8c0ea45ef6e0aa164616469 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Climate platform that offers a climate device for the TFIAC protocol.""" | |
import logging | |
import voluptuous as vol | |
from homeassistant.components.climate import PLATFORM_SCHEMA | |
from homeassistant.const import (TEMP_FAHRENHEIT, CONF_HOST, ATTR_TEMPERATURE, | |
STATE_ON, STATE_OFF) | |
from homeassistant.components.climate import ( | |
ClimateDevice, SUPPORT_FAN_MODE, SUPPORT_ON_OFF, SUPPORT_OPERATION_MODE, | |
SUPPORT_SWING_MODE, SUPPORT_TARGET_TEMPERATURE) | |
import homeassistant.helpers.config_validation as cv | |
# Description of how the platform is defined | |
# climate: | |
# platform: tfiac | |
# host: 192.168.10.26 | |
DOMAIN = 'tfiac' | |
REQUIREMENTS = ['requests'] | |
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ | |
vol.Required(CONF_HOST): cv.string, | |
}) | |
_LOGGER = logging.getLogger(__name__) | |
OPERATION_LIST = ['heat', 'selfFeel'] # todo: add real values. | |
FAN_LIST = ['Auto', 'Low', 'Normal', 'High'] # todo: add real values. | |
SWING_LIST = [ | |
'Off', | |
'Vertical', | |
'Horizontal', | |
'Both', | |
] # todo: add real values. | |
def setup_platform(hass, config, add_devices, discovery_info=None): | |
"""Set up the TFIAC climate device.""" | |
_LOGGER.info(discovery_info) | |
host = config.get(CONF_HOST) | |
if host is not None: | |
add_devices([TfiacClimate(hass, host)]) | |
class TfiacClimate(ClimateDevice): | |
"""TFIAC class.""" | |
def __init__(self, hass, host): | |
"""Init class.""" | |
hass.data[DOMAIN] = self | |
self._client = Tfiac(host) | |
def update(self): | |
"""Update status via socket polling.""" | |
self._client.update() | |
@property | |
def supported_features(self): | |
"""Return the list of supported features.""" | |
return (SUPPORT_FAN_MODE | SUPPORT_ON_OFF | SUPPORT_OPERATION_MODE | |
| SUPPORT_SWING_MODE | SUPPORT_TARGET_TEMPERATURE) | |
@property | |
def name(self): | |
"""Return the name of the climate device.""" | |
return self._client.name | |
@property | |
def target_temperature(self): | |
"""Return the temperature we try to reach.""" | |
return self._client.status['target_temp'] | |
@property | |
def temperature_unit(self): | |
"""Return the unit of measurement.""" | |
return TEMP_FAHRENHEIT | |
@property | |
def current_temperature(self): | |
"""Return the current temperature.""" | |
return self._client.status['current_temp'] | |
@property | |
def current_operation(self): | |
"""Return current operation ie. heat, cool, idle.""" | |
return self._client.status['operation'] | |
@property | |
def is_on(self): | |
"""Return true if on.""" | |
return STATE_ON if self._client.is_on else STATE_OFF | |
return self._client.status['current_temp'] | |
@property | |
def operation_list(self): | |
"""Return the list of available operation modes.""" | |
return OPERATION_LIST | |
@property | |
def current_fan_mode(self): | |
"""Return the fan setting.""" | |
return self._client.status['fan_mode'] | |
@property | |
def fan_list(self): | |
"""Return the list of available fan modes.""" | |
return FAN_LIST | |
@property | |
def current_swing_mode(self): | |
"""Return the swing setting.""" | |
return self._client.status['swing_mode'] | |
@property | |
def swing_list(self): | |
"""List of available swing modes.""" | |
return SWING_LIST | |
def set_temperature(self, **kwargs): | |
"""Set new target temperature.""" | |
if kwargs.get(ATTR_TEMPERATURE) is not None: | |
self._client.target_temperature = kwargs.get(ATTR_TEMPERATURE) | |
self._client.set_state('SetTemp', kwargs.get(ATTR_TEMPERATURE)) | |
def set_operation_mode(self, operation_mode): | |
"""Set new operation mode.""" | |
self._client.current_operation = operation_mode | |
self._client.set_state('SetMode', operation_mode) | |
def set_fan_mode(self, fan_mode): | |
"""Set new fan mode.""" | |
self._client.current_fan_mode = fan_mode | |
self._client.set_state('SetFanMode', fan_mode) | |
def set_swing_mode(self, swing_mode): | |
"""Set new swing mode.""" | |
self._client.current_swing_mode = swing_mode | |
self._client.set_state('SetSwingMode', swing_mode) | |
UDP_PORT = 7777 | |
STATUS_MESSAGE = """<msg msgid="SyncStatusReq" type="Control" seq="{seq}"> | |
<SyncStatusReq></SyncStatusReq></msg>""" | |
SET_MESSAGE = """<msg msgid="SetMessage" type="Control" seq="{seq}"> | |
<SetMessage>{message}</SetMessage></msg>""" | |
TURN_ON = "<TurnOn>{status}</TurnOn>" | |
SET_MODE = "<BaseMode>{mode}</BaseMode>" | |
SET_TEMP = "<SetTemp>{temp}</SetTemp>" | |
SET_FAN = "<WindSpeed>{fan}</WindSpeed>" | |
SET_SWING_OFF = "<WindDirection_H>off</WindDirection_H><WindDirection_V>off</WindDirection_V>" | |
SET_SWING_3D = "<WindDirection_H>on</WindDirection_H><WindDirection_V>on</WindDirection_V>" | |
SET_SWING_VERTICAL = "<WindDirection_H>off</WindDirection_H><WindDirection_V>on</WindDirection_V>" | |
SET_SWING_HORIZONTAL = "<WindDirection_H>on</WindDirection_H><WindDirection_V>off</WindDirection_V>" | |
SET_SWING = { | |
'Off': SET_SWING_OFF, | |
'Vertical': SET_SWING_VERTICAL, | |
'Horizontal': SET_SWING_HORIZONTAL, | |
'Both': SET_SWING_3D, | |
} | |
class Tfiac(): | |
"""TFIAC class to handle connections.""" | |
def __init__(self, host): | |
"""Init class.""" | |
self._host = host | |
self._status = { | |
'current_temp': None, | |
'target_temp': None, | |
'operation': None, | |
'fan_mode': None, | |
'swing_mode': None, | |
'is_on': None, | |
} | |
self._name = None | |
self.update() | |
@property | |
def _seq(self): | |
import time | |
return int(time.mktime(time.gmtime(0))) | |
def send(self, message): | |
"""Send message.""" | |
import socket | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.settimeout(5) # 5 second timeout | |
sock.sendto(message.encode(), (self._host, UDP_PORT)) | |
data = sock.recv(4096) | |
sock.close() | |
return data | |
def update(self): | |
"""Update the state of the A/C.""" | |
import xmltodict | |
_seq = self._seq | |
response = self.send(STATUS_MESSAGE.format(seq=_seq)) | |
try: | |
_status = dict(xmltodict.parse(response)['msg']['statusUpdateMsg']) | |
_LOGGER.warning("Current status %s", _status) | |
self._name = _status['DeviceName'] | |
self._status['current_temp'] = float(_status['IndoorTemp']) | |
self._status['target_temp'] = float(_status['SetTemp']) | |
self._status['operation'] = _status['BaseMode'] | |
self._status['fan_mode'] = _status['WindSpeed'] | |
self._status['is_on'] = True if _status['TurnOn'] == 'on' else False | |
except Exception as ex: # pylint: disable=W0703 | |
_LOGGER.error(ex) | |
def set_state(self, mode, value): | |
"""Set the new state of the ac.""" | |
_seq = self._seq | |
if mode == 'TurnOn': | |
self.send( | |
SET_MESSAGE.format( | |
seq=_seq, message=TURN_ON.format(status=value))) | |
elif mode == 'SetTemp': | |
self.send( | |
SET_MESSAGE.format( | |
seq=_seq, message=SET_TEMP.format(temp=value))) | |
elif mode == 'SetMode': | |
if value == 'Off': | |
self.set_state('TurnOn', 'Off') | |
else: # fixme: not sure if this is needed.. | |
self.set_state('TurnOn', 'On') | |
self.send( | |
SET_MESSAGE.format( | |
seq=_seq, message=SET_MODE.format(mode=value))) | |
elif mode == 'SetFanMode': | |
self.send( | |
SET_MESSAGE.format( | |
seq=_seq, message=SET_FAN.format(fan=value))) | |
elif mode == 'SetSwingMode': | |
self.send(SET_MESSAGE.format(seq=_seq, message=SET_SWING[value])) | |
else: | |
return | |
@property | |
def name(self): | |
"""Return name of device.""" | |
return self._name | |
@property | |
def status(self): | |
"""Return dict of current status.""" | |
return self._status |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment