Skip to content

Instantly share code, notes, and snippets.

@OctoNezd
Last active May 1, 2023 13:03
Show Gist options
  • Save OctoNezd/af1cc19c4c853b60884052d5d9fcb63e to your computer and use it in GitHub Desktop.
Save OctoNezd/af1cc19c4c853b60884052d5d9fcb63e to your computer and use it in GitHub Desktop.
AppDaemon App that creates MQTT AC controlled with OpenBeken IR MQTT device
import appdaemon.plugins.mqtt.mqttapi as mqtt
import ctypes
import json
import functools
def dict_inv(old):
new = {}
for value, key in old.items():
new[key] = value
return new
modes = dict(kCoolixCool=0b000,
kCoolixDry=0b001,
kCoolixAuto=0b010,
kCoolixHeat=0b011,
kCoolixFan=0b100)
modes_inv = dict_inv(modes)
fan_states = dict(kCoolixFanMin=0b100,
kCoolixFanMed=0b010,
kCoolixFanMax=0b001,
kCoolixFanAuto=0b101,
kCoolixFanAuto0=0b000,
kCoolixFanZoneFollow=0b110,
kCoolixFanFixed=0b111)
fan_states_inv = dict_inv(fan_states)
modes_ha = dict(kCoolixCool="cool",
kCoolixDry="dry",
kCoolixAuto="auto",
kCoolixHeat="heat",
kCoolixFan="fan_only")
modes_ha_inv = dict_inv(modes_ha)
fan_states_ha_inv = dict(
auto="kCoolixFanAuto0",
low="kCoolixFanMin",
medium="kCoolixFanMed",
high="kCoolixFanMax"
)
fan_states_ha = dict_inv(fan_states_ha_inv)
kCoolixTempMin = 17 # Celsius
kCoolixTempMax = 30 # Celsius
kCoolixFanTempCode = 0b1110 # Part of Fan Mode.
kCoolixSensorTempIgnoreCode = 0b11111
temps = [
0b0000, # 17C
0b0001, # 18c
0b0011, # 19C
0b0010, # 20C
0b0110, # 21C
0b0111, # 22C
0b0101, # 23C
0b0100, # 24C
0b1100, # 25C
0b1101, # 26C
0b1001, # 27C
0b1000, # 28C
0b1010, # 29C
0b1011 # 30C
]
kCoolixOff = 0b101100100111101111100000 # 0xB27BE0
kCoolixSwing = 0b101100100110101111100000 # 0xB26BE0
kCoolixSwingH = 0b101100101111010110100010 # 0xB5F5A2
kCoolixSwingV = 0b101100100000111111100000 # 0xB20FE0
kCoolixSleep = 0b101100101110000000000011 # 0xB2E003
kCoolixTurbo = 0b101101011111010110100010 # 0xB5F5A2
kCoolixLed = 0b101101011111010110100101 # 0xB5F5A5
kCoolixClean = 0b101101011111010110101010 # 0xB5F5AA
kCoolixCmdFan = 0b101100101011111111100100 # 0xB2BFE4
class CoolixCommandCt_Bits(ctypes.LittleEndianStructure):
_fields_ = [
('unk1', ctypes.c_uint32, 1),
('ZoneFollow1', ctypes.c_uint32, 1),
('Mode', ctypes.c_uint32, 2),
('Temp', ctypes.c_uint32, 4),
('SensorTemp', ctypes.c_uint32, 5),
('Fan', ctypes.c_uint32, 3),
('unk2', ctypes.c_uint32, 3),
('ZoneFollow2', ctypes.c_uint32, 1),
('fixed', ctypes.c_uint32, 4)
]
class CoolixCommandCt(ctypes.Union):
_fields_ = [
('raw', ctypes.c_uint32),
('bits', CoolixCommandCt_Bits)
]
def generate_coolix_message(state=True, mode='Auto', fan='Auto0', temp=25, zfollow=False):
if not state:
return kCoolixOff
mode = modes['kCoolix' + mode]
fan = fan_states['kCoolixFan' + fan]
temp = temps[temp - kCoolixTempMin]
zfollow = 0x2
res = CoolixCommandCt()
res.bits.unk1 = 0x0
res.bits.ZoneFollow1 = int(zfollow)
res.bits.Mode = mode
res.bits.Temp = temp
res.bits.SensorTemp = kCoolixSensorTempIgnoreCode
res.bits.Fan = fan
res.bits.unk2 = 0x2
res.bits.ZoneFollow2 = int(zfollow)
res.bits.fixed = 0xb
return res
# On, 25C, Mode: Auto, Fan: Auto, Zone Follow: Off, Sensor Temp: Ignore, 0xb21fc8
DEF_ON_STATE = 0b101100100001111111001000
DT = 25
DM = "auto"
DF = "auto"
UID = "homeComfeeAc"
MY_NS = "ad/ac"
IR_BLASTER = "homeir"
DEVICE_NAME = "Comfee AC"
class CoolixAcGeneric(mqtt.Mqtt):
def initialize(self):
self.cmd = CoolixCommandCt()
self.cmd.raw = kCoolixOff
self.log("Hello from AppDaemon")
self.log("You are now ready to run Apps!")
self.update_mqtt_state()
self.run_every(self.setup_ha_discovery, "now", 10)
self.subscribe()
self.blast_ir()
def blast_ir(self):
self.log("blasting new ir state: %s", hex(self.cmd.raw))
self.mqtt_publish(f"cmnd/{IR_BLASTER}/IRSend",
f"COOLIX,24,{hex(self.cmd.raw)},1")
def subscribe(self):
self.mqtt_subscribe(MY_NS + "/mode")
self.mqtt_subscribe(IR_BLASTER + "")
self.listen_event(self.create_ir_mw(self.set_ac_state),
"MQTT_MESSAGE", topic=MY_NS + "/mode")
self.listen_event(self.create_ir_mw(self.set_ac_temp),
"MQTT_MESSAGE", topic=MY_NS + "/temp")
self.listen_event(self.create_ir_mw(self.set_fan_mode),
"MQTT_MESSAGE", topic=MY_NS + "/fan")
def create_ir_mw(self, func):
@functools.wraps(func)
def wrapped(_, change, __):
if self.cmd.raw == kCoolixOff:
self.cmd.raw = DEF_ON_STATE
pl = change["payload"]
func(pl)
self.blast_ir()
return wrapped
def set_ac_state(self, new_mode):
if new_mode == "off":
self.cmd.raw = kCoolixOff
return
desired_mode = modes_ha_inv[new_mode]
self.cmd.bits.Mode = modes[desired_mode]
self.log("desired mode: %s", desired_mode)
def set_ac_temp(self, new_temp):
self.cmd.bits.Temp = temps[int(float(new_temp) - kCoolixTempMin)]
def set_fan_mode(self, new_mode):
desired_mode = fan_states_ha_inv[new_mode]
self.cmd.bits.Fan = fan_states[desired_mode]
self.log('desired fan state: %s', desired_mode)
def setup_ha_discovery(self, *_):
self.mqtt_publish("homeassistant/climate/" + UID +
"/config", payload=json.dumps({
"unique_id": UID,
"temperature_unit": "C",
"temperature_state_topic": MY_NS + "/temp",
"temperature_command_topic": MY_NS + "/temp",
"temperature_state_template": "{{ value_json }}",
"fan_mode_command_topic": MY_NS + "/fan",
"fan_mode_state_topic": MY_NS + "/fan",
"mode_command_topic": MY_NS + "/mode",
"mode_state_topic": MY_NS + "/mode",
"optimistic": True,
"precision": 1,
"device": {
"identifiers": UID
},
"max_temp": kCoolixTempMax,
"min_temp": kCoolixTempMin,
"name": DEVICE_NAME,
"initial": DT
}))
self.log("Published myself to homeassistant discovery")
def update_mqtt_state(self):
if self.cmd.raw == kCoolixOff:
self.mqtt_publish(topic=MY_NS + "/mode", payload="off")
self.mqtt_publish(topic=MY_NS + "/temp", payload=DT)
self.mqtt_publish(topic=MY_NS + "/fan", payload=DF)
return
self.mqtt_publish(
topic=MY_NS + "/temp", payload=temps.index(self.cmd.bits.Temp) + kCoolixTempMin)
self.mqtt_publish(
topic=MY_NS + "/mode", payload=modes_ha[modes_inv[self.cmd.bits.Mode]])
self.mqtt_publish(
topic=MY_NS + "/fan", payload=fan_states_ha.get(fan_states_inv[self.cmd.bits.Fan], "auto"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment