Created
March 9, 2022 15:33
-
-
Save grodzik/ef717cb672bcf69cf09852b16187faa1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Map from manufacturer to standard clusters for electric heating thermostats.""" | |
import logging | |
from zigpy.profiles import zha | |
import zigpy.types as t | |
from zigpy.zcl.clusters.general import Basic, Groups, Ota, Scenes, Time | |
from zhaquirks.const import ( | |
DEVICE_TYPE, | |
ENDPOINTS, | |
INPUT_CLUSTERS, | |
MODELS_INFO, | |
OUTPUT_CLUSTERS, | |
PROFILE_ID, | |
) | |
import dataclasses | |
import datetime | |
from typing import Any, Callable, Dict, List, Optional, Tuple, Union | |
from zigpy.quirks import CustomCluster, CustomDevice | |
from zigpy.zcl import foundation | |
from zigpy.zcl.clusters.closures import WindowCovering | |
from zigpy.zcl.clusters.general import LevelControl, OnOff, PowerConfiguration | |
from zigpy.zcl.clusters.hvac import Thermostat, UserInterface | |
from zhaquirks import Bus, EventableCluster, LocalDataCluster | |
from zhaquirks.const import DOUBLE_PRESS, LONG_PRESS, SHORT_PRESS, ZHA_SEND_EVENT | |
# --------------------------------------------------------- | |
# Tuya Custom Cluster ID | |
# --------------------------------------------------------- | |
TUYA_CLUSTER_ID = 0xEF00 | |
# --------------------------------------------------------- | |
# Tuya Cluster Commands | |
# --------------------------------------------------------- | |
TUYA_SET_DATA = 0x00 | |
TUYA_GET_DATA = 0x01 | |
TUYA_SET_DATA_RESPONSE = 0x02 | |
TUYA_SEND_DATA = 0x04 | |
TUYA_ACTIVE_STATUS_RPT = 0x06 | |
TUYA_SET_TIME = 0x24 | |
TUYA_LEVEL_COMMAND = 514 | |
COVER_EVENT = "cover_event" | |
LEVEL_EVENT = "level_event" | |
# --------------------------------------------------------- | |
# Value for dp_type | |
# --------------------------------------------------------- | |
# ID Name Description | |
# --------------------------------------------------------- | |
# 0x00 DP_TYPE_RAW ? | |
# 0x01 DP_TYPE_BOOL ? | |
# 0x02 DP_TYPE_VALUE 4 byte unsigned integer | |
# 0x03 DP_TYPE_STRING variable length string | |
# 0x04 DP_TYPE_ENUM 1 byte enum | |
# 0x05 DP_TYPE_FAULT 1 byte bitmap (didn't test yet) | |
TUYA_DP_TYPE_RAW = 0x0000 | |
TUYA_DP_TYPE_BOOL = 0x0100 | |
TUYA_DP_TYPE_VALUE = 0x0200 | |
TUYA_DP_TYPE_STRING = 0x0300 | |
TUYA_DP_TYPE_ENUM = 0x0400 | |
TUYA_DP_TYPE_FAULT = 0x0500 | |
# --------------------------------------------------------- | |
# Value for dp_identifier (These are device specific) | |
# --------------------------------------------------------- | |
# ID Name Type Description | |
# --------------------------------------------------------- | |
# 0x01 control enum open, stop, close, continue | |
# 0x02 percent_control value 0-100% control | |
# 0x03 percent_state value Report from motor about current percentage | |
# 0x04 control_back enum Configures motor direction (untested) | |
# 0x05 work_state enum Motor Direction Setting | |
# 0x06 situation_set enum Configures if 100% equals to fully closed or fully open (untested) | |
# 0x07 fault bitmap Anything but 0 means something went wrong (untested) | |
TUYA_DP_ID_CONTROL = 0x01 | |
TUYA_DP_ID_PERCENT_CONTROL = 0x02 | |
TUYA_DP_ID_PERCENT_STATE = 0x03 | |
TUYA_DP_ID_DIRECTION_CHANGE = 0x05 | |
TUYA_DP_ID_COVER_INVERTED = 0x06 | |
# --------------------------------------------------------- | |
# Window Cover Server Commands | |
# --------------------------------------------------------- | |
WINDOW_COVER_COMMAND_UPOPEN = 0x0000 | |
WINDOW_COVER_COMMAND_DOWNCLOSE = 0x0001 | |
WINDOW_COVER_COMMAND_STOP = 0x0002 | |
WINDOW_COVER_COMMAND_LIFTPERCENT = 0x0005 | |
WINDOW_COVER_COMMAND_CUSTOM = 0x0006 | |
# --------------------------------------------------------- | |
# TUYA Cover Custom Values | |
# --------------------------------------------------------- | |
COVER_EVENT = "cover_event" | |
ATTR_COVER_POSITION = 0x0008 | |
ATTR_COVER_DIRECTION = 0x8001 | |
ATTR_COVER_INVERTED = 0x8002 | |
# For most tuya devices 0 = Up/Open, 1 = Stop, 2 = Down/Close | |
TUYA_COVER_COMMAND = { | |
"_TZE200_zah67ekd": {0x0000: 0x0000, 0x0001: 0x0002, 0x0002: 0x0001}, | |
"_TZE200_fzo2pocs": {0x0000: 0x0000, 0x0001: 0x0002, 0x0002: 0x0001}, | |
"_TZE200_xuzcvlku": {0x0000: 0x0000, 0x0001: 0x0002, 0x0002: 0x0001}, | |
"_TZE200_rddyvrci": {0x0000: 0x0002, 0x0001: 0x0001, 0x0002: 0x0000}, | |
"_TZE200_3i3exuay": {0x0000: 0x0000, 0x0001: 0x0002, 0x0002: 0x0001}, | |
"_TZE200_nueqqe6k": {0x0000: 0x0000, 0x0001: 0x0002, 0x0002: 0x0001}, | |
"_TZE200_gubdgai2": {0x0000: 0x0000, 0x0001: 0x0002, 0x0002: 0x0001}, | |
"_TZE200_zpzndjez": {0x0000: 0x0000, 0x0001: 0x0002, 0x0002: 0x0001}, | |
"_TZE200_cowvfni3": {0x0000: 0x0002, 0x0001: 0x0000, 0x0002: 0x0001}, | |
} | |
# --------------------------------------------------------- | |
# TUYA Switch Custom Values | |
# --------------------------------------------------------- | |
SWITCH_EVENT = "switch_event" | |
ATTR_ON_OFF = 0x0000 | |
ATTR_COVER_POSITION = 0x0008 | |
TUYA_CMD_BASE = 0x0100 | |
# --------------------------------------------------------- | |
# DP Value meanings in Status Report | |
# --------------------------------------------------------- | |
# Type ID IntDP Description | |
# --------------------------------------------------------- | |
# 0x04 0x01 1025 Confirm opening/closing/stopping (triggered from Zigbee) | |
# 0x02 0x02 514 Started moving to position (triggered from Zigbee) | |
# 0x04 0x07 1031 Started moving (triggered by transmitter order pulling on curtain) | |
# 0x02 0x03 515 Arrived at position | |
# 0x01 0x05 261 Returned by configuration set; ignore | |
# 0x02 0x69 617 Not sure what this is | |
# 0x04 0x05 1029 Changed the Motor Direction | |
# 0x04 0x65 1125 Change of tilt/lift mode 1 = lift 0=tilt | |
# --------------------------------------------------------- | |
_LOGGER = logging.getLogger(__name__) | |
class BigEndianInt16(int): | |
"""Helper class to represent big endian 16 bit value.""" | |
def serialize(self) -> bytes: | |
"""Value serialisation.""" | |
try: | |
return self.to_bytes(2, "big", signed=False) | |
except OverflowError as e: | |
# OverflowError is not a subclass of ValueError, making it annoying to catch | |
raise ValueError(str(e)) from e | |
@classmethod | |
def deserialize(cls, data: bytes) -> Tuple["BigEndianInt16", bytes]: | |
"""Value deserialisation.""" | |
if len(data) < 2: | |
raise ValueError(f"Data is too short to contain {cls._size} bytes") | |
r = cls.from_bytes(data[:2], "big", signed=False) | |
data = data[2:] | |
return r, data | |
class TuyaTimePayload(t.LVList, item_type=t.uint8_t, length_type=BigEndianInt16): | |
"""Tuya set time payload definition.""" | |
pass | |
class TuyaDPType(t.enum8): | |
"""DataPoint Type.""" | |
RAW = 0x00 | |
BOOL = 0x01 | |
VALUE = 0x02 | |
STRING = 0x03 | |
ENUM = 0x04 | |
BITMAP = 0x05 | |
class TuyaData(t.Struct): | |
"""Tuya Data type.""" | |
dp_type: TuyaDPType | |
function: t.uint8_t | |
raw: t.LVBytes | |
@classmethod | |
def deserialize(cls, data: bytes) -> Tuple["TuyaData", bytes]: | |
"""Deserialize data.""" | |
res = cls() | |
res.dp_type, data = TuyaDPType.deserialize(data) | |
res.function, data = t.uint8_t.deserialize(data) | |
res.raw, data = t.LVBytes.deserialize(data) | |
if res.dp_type not in (TuyaDPType.BITMAP, TuyaDPType.STRING, TuyaDPType.ENUM): | |
res.raw = res.raw[::-1] | |
return res, data | |
@property | |
def payload(self) -> Union[t.Bool, t.CharacterString, t.uint32_t, t.data32]: | |
"""Payload accordingly to data point type.""" | |
if self.dp_type == TuyaDPType.VALUE: | |
return t.uint32_t.deserialize(self.raw)[0] | |
elif self.dp_type == TuyaDPType.BOOL: | |
return t.Bool.deserialize(self.raw)[0] | |
elif self.dp_type == TuyaDPType.STRING: | |
return self.raw.decode("utf8") | |
elif self.dp_type == TuyaDPType.ENUM: | |
return t.enum8.deserialize(self.raw)[0] | |
elif self.dp_type == TuyaDPType.BITMAP: | |
bitmaps = {1: t.bitmap8, 2: t.bitmap16, 4: t.bitmap32} | |
try: | |
return bitmaps[len(self.raw)].deserialize(self.raw)[0] | |
except KeyError as exc: | |
raise ValueError(f"Wrong bitmap length: {len(self.raw)}") from exc | |
raise ValueError(f"Unknown {self.dp_type} datapoint type") | |
class Data(t.List, item_type=t.uint8_t): | |
"""list of uint8_t.""" | |
@classmethod | |
def from_value(cls, value): | |
"""Convert from a zigpy typed value to a tuya data payload.""" | |
# serialized in little-endian by zigpy | |
data = cls(value.serialize()) | |
# we want big-endian, with length prepended | |
data.append(len(data)) | |
data.reverse() | |
return data | |
def to_value(self, ztype): | |
"""Convert from a tuya data payload to a zigpy typed value.""" | |
# first uint8_t is the length of the remaining data | |
# tuya data is in big endian whereas ztypes use little endian | |
value, _ = ztype.deserialize(bytes(reversed(self[1:]))) | |
return value | |
class TuyaCommand(t.Struct): | |
"""Tuya manufacturer cluster command.""" | |
status: t.uint8_t | |
tsn: t.uint8_t | |
dp: t.uint8_t | |
data: TuyaData | |
class TuyaManufCluster(CustomCluster): | |
"""Tuya manufacturer specific cluster.""" | |
name = "Tuya Manufacturer Specicific" | |
cluster_id = TUYA_CLUSTER_ID | |
ep_attribute = "tuya_manufacturer" | |
set_time_offset = 0 | |
set_time_local_offset = None | |
class Command(t.Struct): | |
"""Tuya manufacturer cluster command.""" | |
status: t.uint8_t | |
tsn: t.uint8_t | |
command_id: t.uint16_t | |
function: t.uint8_t | |
data: Data | |
""" Time sync command (It's transparent between MCU and server) | |
Time request device -> server | |
payloadSize = 0 | |
Set time, server -> device | |
payloadSize, should be always 8 | |
payload[0-3] - UTC timestamp (big endian) | |
payload[4-7] - Local timestamp (big endian) | |
Zigbee payload is very similar to the UART payload which is described here: https://developer.tuya.com/en/docs/iot/device-development/access-mode-mcu/zigbee-general-solution/tuya-zigbee-module-uart-communication-protocol/tuya-zigbee-module-uart-communication-protocol?id=K9ear5khsqoty#title-10-Time%20synchronization | |
Some devices need the timestamp in seconds from 1/1/1970 and others in seconds from 1/1/2000. | |
NOTE: You need to wait for time request before setting it. You can't set time without request.""" | |
manufacturer_server_commands = { | |
0x0000: ("set_data", (Command,), False), | |
0x0024: ("set_time", (TuyaTimePayload,), False), | |
} | |
manufacturer_client_commands = { | |
0x0001: ("get_data", (Command,), True), | |
0x0002: ("set_data_response", (Command,), True), | |
0x0024: ("set_time_request", (t.data16,), True), | |
} | |
def handle_cluster_request( | |
self, | |
hdr: foundation.ZCLHeader, | |
args: Tuple, | |
*, | |
dst_addressing: Optional[ | |
Union[t.Addressing.Group, t.Addressing.IEEE, t.Addressing.NWK] | |
] = None, | |
) -> None: | |
"""Handle time request.""" | |
_LOGGER.error( | |
"[0x%04x:%s:0x%04x] TuyaManufCluster handle_cluster_request (command 0x%04x) set_time_offset %d set_time_local_offset %d", | |
self.endpoint.device.nwk, | |
self.endpoint.endpoint_id, | |
self.cluster_id, | |
hdr.command_id, | |
self.set_time_offset, | |
self.set_time_local_offset | |
) | |
if hdr.command_id != 0x0024 or self.set_time_offset == 0: | |
return super().handle_cluster_request( | |
hdr, args, dst_addressing=dst_addressing | |
) | |
# Send default response because the MCU expects it | |
if not hdr.frame_control.disable_default_response: | |
self.send_default_rsp(hdr, status=foundation.Status.SUCCESS) | |
_LOGGER.error( | |
"[0x%04x:%s:0x%04x] Got set time request (command 0x%04x)", | |
self.endpoint.device.nwk, | |
self.endpoint.endpoint_id, | |
self.cluster_id, | |
hdr.command_id, | |
) | |
payload = TuyaTimePayload() | |
utc_timestamp = int( | |
( | |
datetime.datetime.utcnow() | |
- datetime.datetime(self.set_time_offset, 1, 1) | |
).total_seconds() | |
) | |
local_timestamp = int( | |
( | |
datetime.datetime.now() | |
- datetime.datetime( | |
self.set_time_local_offset or self.set_time_offset, 1, 1 | |
) | |
).total_seconds() | |
) | |
payload.extend(utc_timestamp.to_bytes(4, "big", signed=False)) | |
payload.extend(local_timestamp.to_bytes(4, "big", signed=False)) | |
self.create_catching_task( | |
super().command(TUYA_SET_TIME, payload, expect_reply=False) | |
) | |
class TuyaManufClusterAttributes(TuyaManufCluster): | |
"""Manufacturer specific cluster for Tuya converting attributes <-> commands.""" | |
def handle_cluster_request( | |
self, | |
hdr: foundation.ZCLHeader, | |
args: Tuple, | |
*, | |
dst_addressing: Optional[ | |
Union[t.Addressing.Group, t.Addressing.IEEE, t.Addressing.NWK] | |
] = None, | |
) -> None: | |
"""Handle cluster request.""" | |
if hdr.command_id not in (0x0001, 0x0002): | |
return super().handle_cluster_request( | |
hdr, args, dst_addressing=dst_addressing | |
) | |
# Send default response because the MCU expects it | |
if not hdr.frame_control.disable_default_response: | |
self.send_default_rsp(hdr, status=foundation.Status.SUCCESS) | |
tuya_cmd = args[0].command_id | |
tuya_data = args[0].data | |
_LOGGER.debug( | |
"[0x%04x:%s:0x%04x] Received value %s " | |
"for attribute 0x%04x (command 0x%04x)", | |
self.endpoint.device.nwk, | |
self.endpoint.endpoint_id, | |
self.cluster_id, | |
repr(tuya_data[1:]), | |
tuya_cmd, | |
hdr.command_id, | |
) | |
if tuya_cmd not in self.attributes: | |
return | |
ztype = self.attributes[tuya_cmd][1] | |
zvalue = tuya_data.to_value(ztype) | |
self._update_attribute(tuya_cmd, zvalue) | |
def read_attributes( | |
self, attributes, allow_cache=False, only_cache=False, manufacturer=None | |
): | |
"""Ignore remote reads as the "get_data" command doesn't seem to do anything.""" | |
return super().read_attributes( | |
attributes, allow_cache=True, only_cache=True, manufacturer=manufacturer | |
) | |
async def write_attributes(self, attributes, manufacturer=None): | |
"""Defer attributes writing to the set_data tuya command.""" | |
records = self._write_attr_records(attributes) | |
for record in records: | |
cmd_payload = TuyaManufCluster.Command() | |
cmd_payload.status = 0 | |
cmd_payload.tsn = self.endpoint.device.application.get_sequence() | |
cmd_payload.command_id = record.attrid | |
cmd_payload.function = 0 | |
cmd_payload.data = Data.from_value(record.value.value) | |
await super().command( | |
TUYA_SET_DATA, | |
cmd_payload, | |
manufacturer=manufacturer, | |
expect_reply=False, | |
tsn=cmd_payload.tsn, | |
) | |
return [[foundation.WriteAttributesStatusRecord(foundation.Status.SUCCESS)]] | |
class TuyaOnOff(CustomCluster, OnOff): | |
"""Tuya On/Off cluster for On/Off device.""" | |
def __init__(self, *args, **kwargs): | |
"""Init.""" | |
super().__init__(*args, **kwargs) | |
self.endpoint.device.switch_bus.add_listener(self) | |
def switch_event(self, channel, state): | |
"""Switch event.""" | |
_LOGGER.debug( | |
"%s - Received switch event message, channel: %d, state: %d", | |
self.endpoint.device.ieee, | |
channel, | |
state, | |
) | |
self._update_attribute(ATTR_ON_OFF, state) | |
def command( | |
self, | |
command_id: Union[foundation.Command, int, t.uint8_t], | |
*args, | |
manufacturer: Optional[Union[int, t.uint16_t]] = None, | |
expect_reply: bool = True, | |
tsn: Optional[Union[int, t.uint8_t]] = None, | |
): | |
"""Override the default Cluster command.""" | |
if command_id in (0x0000, 0x0001): | |
cmd_payload = TuyaManufCluster.Command() | |
cmd_payload.status = 0 | |
cmd_payload.tsn = 0 | |
cmd_payload.command_id = TUYA_CMD_BASE + self.endpoint.endpoint_id | |
cmd_payload.function = 0 | |
cmd_payload.data = [1, command_id] | |
return self.endpoint.tuya_manufacturer.command( | |
TUYA_SET_DATA, cmd_payload, expect_reply=True | |
) | |
return foundation.Status.UNSUP_CLUSTER_COMMAND | |
class TuyaManufacturerClusterOnOff(TuyaManufCluster): | |
"""Manufacturer Specific Cluster of On/Off device.""" | |
def handle_cluster_request( | |
self, | |
hdr: foundation.ZCLHeader, | |
args: Tuple[TuyaManufCluster.Command], | |
*, | |
dst_addressing: Optional[ | |
Union[t.Addressing.Group, t.Addressing.IEEE, t.Addressing.NWK] | |
] = None, | |
) -> None: | |
"""Handle cluster request.""" | |
tuya_payload = args[0] | |
if hdr.command_id in (0x0002, 0x0001): | |
self.endpoint.device.switch_bus.listener_event( | |
SWITCH_EVENT, | |
tuya_payload.command_id - TUYA_CMD_BASE, | |
tuya_payload.data[1], | |
) | |
class TuyaSwitch(CustomDevice): | |
"""Tuya switch device.""" | |
def __init__(self, *args, **kwargs): | |
"""Init device.""" | |
self.switch_bus = Bus() | |
super().__init__(*args, **kwargs) | |
class TuyaDimmerSwitch(TuyaSwitch): | |
"""Tuya dimmer switch device.""" | |
def __init__(self, *args, **kwargs): | |
"""Init device.""" | |
self.dimmer_bus = Bus() | |
super().__init__(*args, **kwargs) | |
class TuyaThermostatCluster(LocalDataCluster, Thermostat): | |
"""Thermostat cluster for Tuya thermostats.""" | |
_CONSTANT_ATTRIBUTES = {0x001B: Thermostat.ControlSequenceOfOperation.Heating_Only} | |
def __init__(self, *args, **kwargs): | |
"""Init.""" | |
super().__init__(*args, **kwargs) | |
self.endpoint.device.thermostat_bus.add_listener(self) | |
def temperature_change(self, attr, value): | |
"""Local or target temperature change from device.""" | |
self._update_attribute(self.attridx[attr], value) | |
def state_change(self, value): | |
"""State update from device.""" | |
if value == 0: | |
mode = self.RunningMode.Off | |
state = self.RunningState.Idle | |
else: | |
mode = self.RunningMode.Heat | |
state = self.RunningState.Heat_State_On | |
self._update_attribute(self.attridx["running_mode"], mode) | |
self._update_attribute(self.attridx["running_state"], state) | |
# pylint: disable=R0201 | |
def map_attribute(self, attribute, value): | |
"""Map standardized attribute value to dict of manufacturer values.""" | |
return {} | |
async def write_attributes(self, attributes, manufacturer=None): | |
"""Implement writeable attributes.""" | |
records = self._write_attr_records(attributes) | |
if not records: | |
return [[foundation.WriteAttributesStatusRecord(foundation.Status.SUCCESS)]] | |
manufacturer_attrs = {} | |
for record in records: | |
attr_name = self.attributes[record.attrid][0] | |
new_attrs = self.map_attribute(attr_name, record.value.value) | |
_LOGGER.debug( | |
"[0x%04x:%s:0x%04x] Mapping standard %s (0x%04x) " | |
"with value %s to custom %s", | |
self.endpoint.device.nwk, | |
self.endpoint.endpoint_id, | |
self.cluster_id, | |
attr_name, | |
record.attrid, | |
repr(record.value.value), | |
repr(new_attrs), | |
) | |
manufacturer_attrs.update(new_attrs) | |
if not manufacturer_attrs: | |
return [ | |
[ | |
foundation.WriteAttributesStatusRecord( | |
foundation.Status.FAILURE, r.attrid | |
) | |
for r in records | |
] | |
] | |
await self.endpoint.tuya_manufacturer.write_attributes( | |
manufacturer_attrs, manufacturer=manufacturer | |
) | |
return [[foundation.WriteAttributesStatusRecord(foundation.Status.SUCCESS)]] | |
# pylint: disable=W0236 | |
async def command( | |
self, | |
command_id: Union[foundation.Command, int, t.uint8_t], | |
*args, | |
manufacturer: Optional[Union[int, t.uint16_t]] = None, | |
expect_reply: bool = True, | |
tsn: Optional[Union[int, t.uint8_t]] = None, | |
): | |
"""Implement thermostat commands.""" | |
if command_id != 0x0000: | |
return [command_id, foundation.Status.UNSUP_CLUSTER_COMMAND] | |
mode, offset = args | |
if mode not in (self.SetpointMode.Heat, self.SetpointMode.Both): | |
return [command_id, foundation.Status.INVALID_VALUE] | |
attrid = self.attridx["occupied_heating_setpoint"] | |
success, _ = await self.read_attributes((attrid,), manufacturer=manufacturer) | |
try: | |
current = success[attrid] | |
except KeyError: | |
return foundation.Status.FAILURE | |
# offset is given in decidegrees, see Zigbee cluster specification | |
(res,) = await self.write_attributes( | |
{"occupied_heating_setpoint": current + offset * 10}, | |
manufacturer=manufacturer, | |
) | |
return [command_id, res[0].status] | |
class TuyaUserInterfaceCluster(LocalDataCluster, UserInterface): | |
"""HVAC User interface cluster for tuya thermostats.""" | |
def __init__(self, *args, **kwargs): | |
"""Init.""" | |
super().__init__(*args, **kwargs) | |
self.endpoint.device.ui_bus.add_listener(self) | |
def child_lock_change(self, mode): | |
"""Change of child lock setting.""" | |
if mode == 0: | |
lockout = self.KeypadLockout.No_lockout | |
else: | |
lockout = self.KeypadLockout.Level_1_lockout | |
self._update_attribute(self.attridx["keypad_lockout"], lockout) | |
def map_attribute(self, attribute, value): | |
"""Map standardized attribute value to dict of manufacturer values.""" | |
return {} | |
async def write_attributes(self, attributes, manufacturer=None): | |
"""Defer the keypad_lockout attribute to child_lock.""" | |
records = self._write_attr_records(attributes) | |
manufacturer_attrs = {} | |
for record in records: | |
if record.attrid == self.attridx["keypad_lockout"]: | |
lock = 0 if record.value.value == self.KeypadLockout.No_lockout else 1 | |
new_attrs = {self._CHILD_LOCK_ATTR: lock} | |
else: | |
attr_name = self.attributes[record.attrid][0] | |
new_attrs = self.map_attribute(attr_name, record.value.value) | |
_LOGGER.debug( | |
"[0x%04x:%s:0x%04x] Mapping standard %s (0x%04x) " | |
"with value %s to custom %s", | |
self.endpoint.device.nwk, | |
self.endpoint.endpoint_id, | |
self.cluster_id, | |
attr_name, | |
record.attrid, | |
repr(record.value.value), | |
repr(new_attrs), | |
) | |
manufacturer_attrs.update(new_attrs) | |
if not manufacturer_attrs: | |
return [ | |
[ | |
foundation.WriteAttributesStatusRecord( | |
foundation.Status.FAILURE, r.attrid | |
) | |
for r in records | |
] | |
] | |
await self.endpoint.tuya_manufacturer.write_attributes( | |
manufacturer_attrs, manufacturer=manufacturer | |
) | |
return [[foundation.WriteAttributesStatusRecord(foundation.Status.SUCCESS)]] | |
class TuyaPowerConfigurationCluster(LocalDataCluster, PowerConfiguration): | |
"""PowerConfiguration cluster for battery-operated thermostats.""" | |
def __init__(self, *args, **kwargs): | |
"""Init.""" | |
super().__init__(*args, **kwargs) | |
self.endpoint.device.battery_bus.add_listener(self) | |
def battery_change(self, value): | |
"""Change of reported battery percentage remaining.""" | |
self._update_attribute(self.attridx["battery_percentage_remaining"], value * 2) | |
class TuyaThermostat(CustomDevice): | |
"""Generic Tuya thermostat device.""" | |
def __init__(self, *args, **kwargs): | |
"""Init device.""" | |
self.thermostat_bus = Bus() | |
self.ui_bus = Bus() | |
self.battery_bus = Bus() | |
super().__init__(*args, **kwargs) | |
class TuyaSmartRemoteOnOffCluster(OnOff, EventableCluster): | |
"""TuyaSmartRemoteOnOffCluster: fire events corresponding to press type.""" | |
press_type = { | |
0x00: SHORT_PRESS, | |
0x01: DOUBLE_PRESS, | |
0x02: LONG_PRESS, | |
} | |
name = "TS004X_cluster" | |
ep_attribute = "TS004X_cluster" | |
def __init__(self, *args, **kwargs): | |
"""Init.""" | |
self.last_tsn = -1 | |
super().__init__(*args, **kwargs) | |
manufacturer_server_commands = { | |
0xFD: ("press_type", (t.uint8_t,), False), | |
} | |
def handle_cluster_request( | |
self, | |
hdr: foundation.ZCLHeader, | |
args: List[Any], | |
*, | |
dst_addressing: Optional[ | |
Union[t.Addressing.Group, t.Addressing.IEEE, t.Addressing.NWK] | |
] = None, | |
): | |
"""Handle press_types command.""" | |
# normally if default response sent, TS004x wouldn't send such repeated zclframe (with same sequence number), | |
# but for stability reasons (e. g. the case the response doesn't arrive the device), we can simply ignore it | |
if hdr.tsn == self.last_tsn: | |
_LOGGER.debug("TS004X: ignoring duplicate frame") | |
return | |
# save last sequence number | |
self.last_tsn = hdr.tsn | |
# send default response (as soon as possible), so avoid repeated zclframe from device | |
if not hdr.frame_control.disable_default_response: | |
self.debug("TS004X: send default response") | |
self.send_default_rsp(hdr, status=foundation.Status.SUCCESS) | |
# handle command | |
if hdr.command_id == 0xFD: | |
press_type = args[0] | |
self.listener_event( | |
ZHA_SEND_EVENT, self.press_type.get(press_type, "unknown"), [] | |
) | |
# Tuya Window Cover Implementation | |
class TuyaManufacturerWindowCover(TuyaManufCluster): | |
"""Manufacturer Specific Cluster for cover device.""" | |
def handle_cluster_request( | |
self, | |
hdr: foundation.ZCLHeader, | |
args: Tuple[TuyaManufCluster.Command], | |
*, | |
dst_addressing: Optional[ | |
Union[t.Addressing.Group, t.Addressing.IEEE, t.Addressing.NWK] | |
] = None, | |
) -> None: | |
"""Handle cluster request.""" | |
"""Tuya Specific Cluster Commands""" | |
if hdr.command_id == TUYA_SET_DATA_RESPONSE: | |
tuya_payload = args[0] | |
_LOGGER.debug( | |
"%s Received Attribute Report. Command is 0x%04x, Tuya Paylod values" | |
"[Status : %s, TSN: %s, Command: 0x%04x, Function: 0x%02x, Data: %s]", | |
self.endpoint.device.ieee, | |
hdr.command_id, | |
tuya_payload.status, | |
tuya_payload.tsn, | |
tuya_payload.command_id, | |
tuya_payload.function, | |
tuya_payload.data, | |
) | |
if tuya_payload.command_id == TUYA_DP_TYPE_VALUE + TUYA_DP_ID_PERCENT_STATE: | |
self.endpoint.device.cover_bus.listener_event( | |
COVER_EVENT, | |
ATTR_COVER_POSITION, | |
tuya_payload.data[4], | |
) | |
elif ( | |
tuya_payload.command_id | |
== TUYA_DP_TYPE_ENUM + TUYA_DP_ID_DIRECTION_CHANGE | |
): | |
self.endpoint.device.cover_bus.listener_event( | |
COVER_EVENT, | |
ATTR_COVER_DIRECTION, | |
tuya_payload.data[1], | |
) | |
elif ( | |
tuya_payload.command_id == TUYA_DP_TYPE_ENUM + TUYA_DP_ID_COVER_INVERTED | |
): | |
self.endpoint.device.cover_bus.listener_event( | |
COVER_EVENT, | |
ATTR_COVER_INVERTED, | |
tuya_payload.data[1], # Check this | |
) | |
elif hdr.command_id == 0x0011: | |
"""Assuming this is the pairing event""" | |
_LOGGER.debug( | |
"%s Pairing New Tuya Roller Blind. Self [%s], Header [%s], Tuya Paylod [%s]", | |
self.endpoint.device.ieee, | |
self, | |
hdr, | |
args, | |
) | |
"""set initial attributes""" | |
self.endpoint.device.cover_bus.listener_event( | |
COVER_EVENT, | |
ATTR_COVER_POSITION, | |
0, | |
) | |
self.endpoint.device.cover_bus.listener_event( | |
COVER_EVENT, | |
ATTR_COVER_DIRECTION, | |
0, | |
) | |
self.endpoint.device.cover_bus.listener_event( | |
COVER_EVENT, | |
ATTR_COVER_INVERTED, | |
0, | |
) | |
elif hdr.command_id == TUYA_SET_TIME: | |
"""Time event call super""" | |
super().handle_cluster_request(self, hdr, args, dst_addressing) | |
else: | |
_LOGGER.debug( | |
"%s Received Attribute Report - Unknown Command. Self [%s], Header [%s], Tuya Paylod [%s]", | |
self.endpoint.device.ieee, | |
self, | |
hdr, | |
args, | |
) | |
class TuyaWindowCoverControl(LocalDataCluster, WindowCovering): | |
"""Manufacturer Specific Cluster of Device cover.""" | |
"""Add additional attributes for direction""" | |
attributes = WindowCovering.attributes.copy() | |
attributes.update({ATTR_COVER_DIRECTION: ("motor_direction", t.Bool)}) | |
attributes.update({ATTR_COVER_INVERTED: ("cover_inverted", t.Bool)}) | |
def __init__(self, *args, **kwargs): | |
"""Initialize instance.""" | |
super().__init__(*args, **kwargs) | |
self.endpoint.device.cover_bus.add_listener(self) | |
def cover_event(self, attribute, value): | |
"""Event listener for cover events.""" | |
if attribute == ATTR_COVER_POSITION: | |
value = ( | |
value if self._attr_cache.get(ATTR_COVER_INVERTED) == 1 else 100 - value | |
) | |
self._update_attribute(attribute, value) | |
_LOGGER.debug( | |
"%s Tuya Attribute Cache : [%s]", | |
self.endpoint.device.ieee, | |
self._attr_cache, | |
) | |
def command( | |
self, | |
command_id: Union[foundation.Command, int, t.uint8_t], | |
*args, | |
manufacturer: Optional[Union[int, t.uint16_t]] = None, | |
expect_reply: bool = True, | |
tsn: Optional[Union[int, t.uint8_t]] = None, | |
): | |
"""Override the default Cluster command.""" | |
if manufacturer is None: | |
manufacturer = self.endpoint.device.manufacturer | |
_LOGGER.debug( | |
"%s Sending Tuya Cluster Command.. Manufacturer is %s Cluster Command is 0x%04x, Arguments are %s", | |
self.endpoint.device.ieee, | |
manufacturer, | |
command_id, | |
args, | |
) | |
# Open Close or Stop commands | |
tuya_payload = TuyaManufCluster.Command() | |
if command_id in ( | |
WINDOW_COVER_COMMAND_UPOPEN, | |
WINDOW_COVER_COMMAND_DOWNCLOSE, | |
WINDOW_COVER_COMMAND_STOP, | |
): | |
tuya_payload.status = 0 | |
tuya_payload.tsn = tsn if tsn else 0 | |
tuya_payload.command_id = TUYA_DP_TYPE_ENUM + TUYA_DP_ID_CONTROL | |
tuya_payload.function = 0 | |
tuya_payload.data = [ | |
1, | |
# need to implement direction change | |
TUYA_COVER_COMMAND[manufacturer][command_id], | |
] # remap the command to the Tuya command | |
# Set Position Command | |
elif command_id == WINDOW_COVER_COMMAND_LIFTPERCENT: | |
tuya_payload.status = 0 | |
tuya_payload.tsn = tsn if tsn else 0 | |
tuya_payload.command_id = TUYA_DP_TYPE_VALUE + TUYA_DP_ID_PERCENT_CONTROL | |
tuya_payload.function = 0 | |
"""Check direction and correct value""" | |
position = ( | |
args[0] | |
if self._attr_cache.get(ATTR_COVER_INVERTED) == 1 | |
else 100 - args[0] | |
) | |
tuya_payload.data = [ | |
4, | |
0, | |
0, | |
0, | |
position, | |
] | |
# Custom Command | |
elif command_id == WINDOW_COVER_COMMAND_CUSTOM: | |
tuya_payload.status = args[0] | |
tuya_payload.tsn = args[1] | |
tuya_payload.command_id = args[2] | |
tuya_payload.function = args[3] | |
tuya_payload.data = args[4] | |
else: | |
tuya_payload = None | |
# Send the command | |
if tuya_payload.command_id: | |
_LOGGER.debug( | |
"%s Sending Tuya Command. Paylod values [endpoint_id : %s, " | |
"Status : %s, TSN: %s, Command: 0x%04x, Function: %s, Data: %s]", | |
self.endpoint.device.ieee, | |
self.endpoint.endpoint_id, | |
tuya_payload.status, | |
tuya_payload.tsn, | |
tuya_payload.command_id, | |
tuya_payload.function, | |
tuya_payload.data, | |
) | |
return self.endpoint.tuya_manufacturer.command( | |
TUYA_SET_DATA, tuya_payload, expect_reply=True | |
) | |
else: | |
_LOGGER.debug("Unrecognised command: %x", command_id) | |
return foundation.Status.UNSUP_CLUSTER_COMMAND | |
class TuyaWindowCover(CustomDevice): | |
"""Tuya switch device.""" | |
def __init__(self, *args, **kwargs): | |
"""Init device.""" | |
self.cover_bus = Bus() | |
super().__init__(*args, **kwargs) | |
class TuyaManufacturerLevelControl(TuyaManufCluster): | |
"""Manufacturer Specific Cluster for cover device.""" | |
def handle_cluster_request( | |
self, | |
hdr: foundation.ZCLHeader, | |
args: Tuple[TuyaManufCluster.Command], | |
*, | |
dst_addressing: Optional[ | |
Union[t.Addressing.Group, t.Addressing.IEEE, t.Addressing.NWK] | |
] = None, | |
) -> None: | |
"""Handle cluster request.""" | |
tuya_payload = args[0] | |
_LOGGER.debug( | |
"%s Received Attribute Report. Command is %x, Tuya Paylod values" | |
"[Status : %s, TSN: %s, Command: %s, Function: %s, Data: %s]", | |
self.endpoint.device.ieee, | |
hdr.command_id, | |
tuya_payload.status, | |
tuya_payload.tsn, | |
tuya_payload.command_id, | |
tuya_payload.function, | |
tuya_payload.data, | |
) | |
if hdr.command_id in (0x0002, 0x0001): | |
if tuya_payload.command_id == TUYA_LEVEL_COMMAND: | |
self.endpoint.device.dimmer_bus.listener_event( | |
LEVEL_EVENT, | |
tuya_payload.command_id, | |
tuya_payload.data, | |
) | |
else: | |
self.endpoint.device.switch_bus.listener_event( | |
SWITCH_EVENT, | |
tuya_payload.command_id - TUYA_CMD_BASE, | |
tuya_payload.data[1], | |
) | |
class TuyaLevelControl(CustomCluster, LevelControl): | |
"""Tuya Level cluster for dimmable device.""" | |
def __init__(self, *args, **kwargs): | |
"""Init.""" | |
super().__init__(*args, **kwargs) | |
self.endpoint.device.dimmer_bus.add_listener(self) | |
def level_event(self, channel, state): | |
"""Level event.""" | |
level = (((state[3] << 8) + state[4]) * 255) // 1000 | |
_LOGGER.debug( | |
"%s - Received level event message, channel: %d, level: %d, data: %d", | |
self.endpoint.device.ieee, | |
channel, | |
level, | |
state, | |
) | |
self._update_attribute(self.attridx["current_level"], level) | |
def command( | |
self, | |
command_id: Union[foundation.Command, int, t.uint8_t], | |
*args, | |
manufacturer: Optional[Union[int, t.uint16_t]] = None, | |
expect_reply: bool = True, | |
tsn: Optional[Union[int, t.uint8_t]] = None, | |
): | |
"""Override the default Cluster command.""" | |
_LOGGER.debug( | |
"%s Sending Tuya Cluster Command.. Cluster Command is %x, Arguments are %s", | |
self.endpoint.device.ieee, | |
command_id, | |
args, | |
) | |
# Move to level | |
# move_to_level_with_on_off | |
if command_id in (0x0000, 0x0001, 0x0004): | |
cmd_payload = TuyaManufCluster.Command() | |
cmd_payload.status = 0 | |
cmd_payload.tsn = 0 | |
cmd_payload.command_id = TUYA_LEVEL_COMMAND | |
cmd_payload.function = 0 | |
brightness = (args[0] * 1000) // 255 | |
val1 = brightness >> 8 | |
val2 = brightness & 0xFF | |
cmd_payload.data = [4, 0, 0, val1, val2] # Custom Command | |
return self.endpoint.tuya_manufacturer.command( | |
TUYA_SET_DATA, cmd_payload, expect_reply=True | |
) | |
return foundation.Status.UNSUP_CLUSTER_COMMAND | |
class TuyaLocalCluster(LocalDataCluster): | |
"""Tuya virtual clusters. | |
Prevents attribute reads and writes. Attribute writes could be converted | |
to DataPoint updates. | |
""" | |
def update_attribute(self, attr_name: str, value: Any) -> None: | |
"""Update attribute by attribute name.""" | |
try: | |
attrid = self.attridx[attr_name] | |
except KeyError: | |
self.debug("no such attribute: %s", attr_name) | |
return | |
return self._update_attribute(attrid, value) | |
@dataclasses.dataclass | |
class DPToAttributeMapping: | |
"""Container for datapoint to cluster attribute update mapping.""" | |
ep_attribute: str | |
attribute_name: str | |
converter: Optional[ | |
Callable[ | |
[ | |
Any, | |
], | |
Any, | |
] | |
] = None | |
endpoint_id: Optional[int] = None | |
class TuyaNewManufCluster(CustomCluster): | |
"""Tuya manufacturer specific cluster. | |
This is an attempt to consolidate the multiple above clusters into a | |
single framework. Instead of overriding the handle_cluster_request() | |
method, implement handlers for commands, like get_data, set_data_response, | |
set_time_request, etc. | |
""" | |
name: str = "Tuya Manufacturer Specific" | |
cluster_id: t.uint16_t = TUYA_CLUSTER_ID | |
ep_attribute: str = "tuya_manufacturer" | |
manufacturer_server_commands = { | |
TUYA_SET_DATA: ("set_data", (TuyaCommand,), False), | |
TUYA_SEND_DATA: ("send_data", (TuyaCommand,), False), | |
TUYA_SET_TIME: ("set_time", (TuyaTimePayload,), False), | |
} | |
manufacturer_client_commands = { | |
TUYA_GET_DATA: ("get_data", (TuyaCommand,), True), | |
TUYA_SET_DATA_RESPONSE: ("set_data_response", (TuyaCommand,), True), | |
TUYA_ACTIVE_STATUS_RPT: ("active_status_report", (TuyaCommand,), True), | |
TUYA_SET_TIME: ("set_time_request", (t.data16,), True), | |
} | |
data_point_handlers: Dict[int, str] = {} | |
def handle_cluster_request( | |
self, | |
hdr: foundation.ZCLHeader, | |
args: Tuple, | |
*, | |
dst_addressing: Optional[ | |
Union[t.Addressing.Group, t.Addressing.IEEE, t.Addressing.NWK] | |
] = None, | |
) -> None: | |
"""Handle cluster specific request.""" | |
try: | |
if ( | |
hdr.is_reply | |
): # server_cluster -> client_cluster cluster specific command | |
handler_name = f"handle_{self.client_commands[hdr.command_id][0]}" | |
else: | |
handler_name = f"handle_{self.server_commands[hdr.command_id][0]}" | |
except KeyError: | |
self.debug( | |
"Received unknown manufacturer command %s: %s", hdr.command_id, args | |
) | |
if not hdr.frame_control.disable_default_response: | |
self.send_default_rsp( | |
hdr, status=foundation.Status.UNSUP_CLUSTER_COMMAND | |
) | |
return | |
try: | |
status = getattr(self, handler_name)(*args) | |
except AttributeError: | |
self.warning( | |
"No '%s' tuya handler found for %s", | |
handler_name, | |
args, | |
) | |
status = foundation.Status.UNSUP_CLUSTER_COMMAND | |
if not hdr.frame_control.disable_default_response: | |
self.send_default_rsp(hdr, status=status) | |
def handle_get_data(self, command: TuyaCommand) -> foundation.Status: | |
"""Handle get_data response (report).""" | |
try: | |
dp_handler = self.data_point_handlers[command.dp] | |
getattr(self, dp_handler)(command) | |
except (AttributeError, KeyError): | |
self.debug("No datapoint handler for %s", command) | |
return foundation.status.UNSUPPORTED_ATTRIBUTE | |
return foundation.Status.SUCCESS | |
handle_set_data_response = handle_get_data | |
handle_active_status_report = handle_get_data | |
def handle_set_time_request(self, payload: t.uint16_t) -> foundation.Status: | |
"""Handle Time set request.""" | |
return foundation.Status.SUCCESS | |
def _dp_2_attr_update(self, command: TuyaCommand) -> None: | |
"""Handle data point to attribute report conversion.""" | |
try: | |
dp_map = self.dp_to_attribute[command.dp] | |
except KeyError: | |
self.debug("No attribute mapping for %s data point", command.dp) | |
return | |
endpoint = self.endpoint | |
if dp_map.endpoint_id: | |
endpoint = self.endpoint.device.endpoints[dp_map.endpoint_id] | |
cluster = getattr(endpoint, dp_map.ep_attribute) | |
value = command.data.payload | |
if dp_map.converter: | |
value = dp_map.converter(value) | |
cluster.update_attribute(dp_map.attribute_name, value) | |
# info from https://github.com/zigpy/zha-device-handlers/pull/538#issuecomment-723334124 | |
# https://github.com/Koenkk/zigbee-herdsman-converters/blob/master/converters/fromZigbee.js#L239 | |
# and https://github.com/Koenkk/zigbee-herdsman-converters/blob/master/converters/common.js#L113 | |
MOESBHT_TARGET_TEMP_ATTR = 0x0210 # [0,0,0,21] target room temp (degree) | |
MOESBHT_TEMPERATURE_ATTR = 0x0218 # [0,0,0,200] current room temp (decidegree) | |
MOESBHT_SCHEDULE_MODE_ATTR = 0x0403 # [1] false [0] true /!\ inverted | |
MOESBHT_MANUAL_MODE_ATTR = 0x0402 # [1] false [0] true /!\ inverted | |
MOESBHT_ENABLED_ATTR = 0x0101 # [0] off [1] on | |
MOESBHT_RUNNING_MODE_ATTR = 0x0424 # [1] idle [0] heating /!\ inverted | |
MOESBHT_CHILD_LOCK_ATTR = 0x0128 # [0] unlocked [1] child-locked | |
_LOGGER = logging.getLogger(__name__) | |
class MoesBHTManufCluster(TuyaManufClusterAttributes): | |
"""Manufacturer Specific Cluster of some electric heating thermostats.""" | |
set_time_offset = 2000 | |
set_time_local_offset = 1970 | |
manufacturer_attributes = { | |
MOESBHT_TARGET_TEMP_ATTR: ("target_temperature", t.uint32_t), | |
MOESBHT_TEMPERATURE_ATTR: ("temperature", t.uint32_t), | |
MOESBHT_SCHEDULE_MODE_ATTR: ("schedule_mode", t.uint8_t), | |
MOESBHT_MANUAL_MODE_ATTR: ("manual_mode", t.uint8_t), | |
MOESBHT_ENABLED_ATTR: ("enabled", t.uint8_t), | |
MOESBHT_RUNNING_MODE_ATTR: ("running_mode", t.uint8_t), | |
MOESBHT_CHILD_LOCK_ATTR: ("child_lock", t.uint8_t), | |
} | |
def _update_attribute(self, attrid, value): | |
super()._update_attribute(attrid, value) | |
if attrid == MOESBHT_TARGET_TEMP_ATTR: | |
self.endpoint.device.thermostat_bus.listener_event( | |
"temperature_change", | |
"occupied_heating_setpoint", | |
value * 100, # degree to centidegree | |
) | |
elif attrid == MOESBHT_TEMPERATURE_ATTR: | |
self.endpoint.device.thermostat_bus.listener_event( | |
"temperature_change", | |
"local_temp", | |
value * 10, # decidegree to centidegree | |
) | |
elif attrid == MOESBHT_SCHEDULE_MODE_ATTR: | |
if value == 0: # value is inverted | |
self.endpoint.device.thermostat_bus.listener_event( | |
"program_change", "scheduled" | |
) | |
elif attrid == MOESBHT_MANUAL_MODE_ATTR: | |
if value == 0: # value is inverted | |
self.endpoint.device.thermostat_bus.listener_event( | |
"program_change", "manual" | |
) | |
elif attrid == MOESBHT_ENABLED_ATTR: | |
self.endpoint.device.thermostat_bus.listener_event("enabled_change", value) | |
elif attrid == MOESBHT_RUNNING_MODE_ATTR: | |
# value is inverted | |
self.endpoint.device.thermostat_bus.listener_event( | |
"state_change", 1 - value | |
) | |
elif attrid == MOESBHT_CHILD_LOCK_ATTR: | |
self.endpoint.device.ui_bus.listener_event("child_lock_change", value) | |
class MoesBHTThermostat(TuyaThermostatCluster): | |
"""Thermostat cluster for some electric heating controllers.""" | |
def map_attribute(self, attribute, value): | |
"""Map standardized attribute value to dict of manufacturer values.""" | |
if attribute == "occupied_heating_setpoint": | |
# centidegree to degree | |
return {MOESBHT_TARGET_TEMP_ATTR: round(value / 100)} | |
if attribute == "system_mode": | |
if value == self.SystemMode.Off: | |
return {MOESBHT_ENABLED_ATTR: 0} | |
if value == self.SystemMode.Heat: | |
return {MOESBHT_ENABLED_ATTR: 1} | |
self.error("Unsupported value for SystemMode") | |
elif attribute == "programing_oper_mode": | |
# values are inverted | |
if value == self.ProgrammingOperationMode.Simple: | |
return {MOESBHT_MANUAL_MODE_ATTR: 0, MOESBHT_SCHEDULE_MODE_ATTR: 1} | |
if value == self.ProgrammingOperationMode.Schedule_programming_mode: | |
return {MOESBHT_MANUAL_MODE_ATTR: 1, MOESBHT_SCHEDULE_MODE_ATTR: 0} | |
self.error("Unsupported value for ProgrammingOperationMode") | |
return super().map_attribute(attribute, value) | |
def program_change(self, mode): | |
"""Programming mode change.""" | |
if mode == "manual": | |
value = self.ProgrammingOperationMode.Simple | |
else: | |
value = self.ProgrammingOperationMode.Schedule_programming_mode | |
self._update_attribute(self.attridx["programing_oper_mode"], value) | |
def enabled_change(self, value): | |
"""System mode change.""" | |
if value == 0: | |
mode = self.SystemMode.Off | |
else: | |
mode = self.SystemMode.Heat | |
self._update_attribute(self.attridx["system_mode"], mode) | |
class MoesBHTUserInterface(TuyaUserInterfaceCluster): | |
"""HVAC User interface cluster for tuya electric heating thermostats.""" | |
_CHILD_LOCK_ATTR = MOESBHT_CHILD_LOCK_ATTR | |
class MoesBHTSpoon(TuyaThermostat): | |
"""Moes BHT-002GCLZB Thermostatic radiator valve.""" | |
signature = { | |
# endpoint=1 profile=260 device_type=81 device_version=1 input_clusters=[0, 4, 5, 61184], | |
# output_clusters=[10, 25] | |
MODELS_INFO: [("_TZE200_aoclfnxz", "TS0601")], | |
ENDPOINTS: { | |
1: { | |
PROFILE_ID: zha.PROFILE_ID, | |
DEVICE_TYPE: zha.DeviceType.SMART_PLUG, | |
INPUT_CLUSTERS: [ | |
Basic.cluster_id, | |
Groups.cluster_id, | |
Scenes.cluster_id, | |
TuyaManufClusterAttributes.cluster_id, | |
], | |
OUTPUT_CLUSTERS: [Time.cluster_id, Ota.cluster_id], | |
} | |
}, | |
} | |
replacement = { | |
ENDPOINTS: { | |
1: { | |
PROFILE_ID: zha.PROFILE_ID, | |
DEVICE_TYPE: zha.DeviceType.THERMOSTAT, | |
INPUT_CLUSTERS: [ | |
Basic.cluster_id, | |
Groups.cluster_id, | |
Scenes.cluster_id, | |
MoesBHTManufCluster, | |
MoesBHTThermostat, | |
MoesBHTUserInterface, | |
], | |
OUTPUT_CLUSTERS: [Time.cluster_id, Ota.cluster_id], | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment