Created
March 10, 2023 02:23
-
-
Save ghys/ea549402e88876e6daa3ac9aa403ec9f to your computer and use it in GitHub Desktop.
Expose a HomeWizard Kettle as a Homie device
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import collections | |
import functools | |
from homie.device_base import Device_Base | |
from homie.node.node_base import Node_Base | |
from homie.node.property.property_integer import Property_Integer | |
from homie.node.property.property_setpoint import Property_Setpoint | |
from homie.node.property.property_string import Property_String | |
from homie.node.property.property_switch import Property_Switch | |
from homie.node.property.property_temperature import Property_Temperature | |
from hwkitchen import HWSocket, Kettle | |
import logging | |
#logging.basicConfig(level=logging.DEBUG) | |
mqtt_settings = { | |
"MQTT_BROKER": "localhost", | |
"MQTT_PORT": 1883 | |
} | |
homewizard_settings = { | |
"HOMEWIZARD_USERNAME": "<snip>", | |
"HOMEWIZARD_PASSWORD": "<snip>" | |
} | |
def new_homewizard_client(): | |
return HWSocket(homewizard_settings["HOMEWIZARD_USERNAME"], homewizard_settings["HOMEWIZARD_PASSWORD"]) | |
ws_client = new_homewizard_client() | |
homie_kettle = None | |
ws_message_queue = collections.deque(maxlen=100) | |
loop = asyncio.get_event_loop() | |
class Device_Kettle(Device_Base): | |
def __init__( | |
self, | |
kettle=None, | |
mqtt_settings=None, | |
): | |
self.kettle = kettle | |
#print("Creating new Device_Kettle from", kettle.to_json()) | |
homie_settings = { | |
"fw_name": kettle.get_model(), | |
"fw_version": kettle.get_version() | |
} | |
device_id = kettle.get_id().replace("/", "-") | |
name = "Kettle" # TODO: kettle.get_name() not in WS responses? | |
temperature_unit = "°C" # TODO: get from state.temperature_unit (no API) | |
super().__init__(device_id, name, homie_settings, mqtt_settings) | |
node = Node_Base(self, "state", "State", "State") | |
self.add_node(node) | |
self.current_temperature = Property_Temperature(node, | |
id="currenttemperature", | |
name="Current Temperature", | |
unit=temperature_unit | |
) | |
node.add_property(self.current_temperature) | |
self.current_status = Property_String(node, | |
id="status", | |
name="Current Status" | |
) | |
node.add_property(self.current_status) | |
node = Node_Base(self, "controls", "Controls", "Controls") | |
self.add_node(node) | |
self.target_temperature = Property_Setpoint(node, | |
id="targettemperature", | |
name="Target Temperature", | |
unit=temperature_unit, | |
set_value=self.set_target_temperature | |
) | |
node.add_property(self.target_temperature) | |
self.boil_before_target = Property_Switch(node, | |
id="boilbeforetargetswitch", | |
name="Boil Before Target", | |
unit=temperature_unit, | |
set_value=self.set_boil_before_target | |
) | |
node.add_property(self.boil_before_target) | |
self.keep_warm_enabled = Property_Switch(node, | |
id="keepwarmswitch", | |
name="Keep Warm", | |
unit=temperature_unit, | |
set_value=self.set_keep_warm_enabled | |
) | |
node.add_property(self.keep_warm_enabled) | |
self.keep_warm_set_time = Property_Integer(node, | |
id="keepwarmsettime", | |
name="Keep Warm Set Time", | |
unit="min", | |
set_value=self.set_keep_warm_set_time | |
) | |
node.add_property(self.keep_warm_set_time) | |
self.boiling_switch = Property_Switch(node, | |
id="boilingswitch", | |
name="Start Boiling", | |
unit=temperature_unit, | |
set_value=self.start_stop_boiling | |
) | |
node.add_property(self.boiling_switch) | |
self.update_properties(kettle) | |
self.start() | |
def update_properties(self, kettle: Kettle): | |
print("Updating", kettle.to_json()) | |
self.kettle = kettle | |
self.current_temperature.value = kettle.get_current_temperature() | |
self.target_temperature.value = kettle.get_target_temperature() | |
self.current_status.value = kettle.get_status().value | |
self.boil_before_target.value = "ON" if kettle.get_boil_before_target() else "OFF" | |
self.keep_warm_enabled.value = "ON" if kettle.get_keep_warm_enabled() else "OFF" | |
self.keep_warm_set_time.value = kettle.get_keep_warm_set_time() | |
self.boiling_switch.value = "ON" if kettle.get_status().value == "heating_to_target" else "OFF" | |
if (self.state == 'ready' and not kettle.is_online): | |
self.state = 'disconnected' | |
if (self.state == 'disconnected' and kettle.is_online): | |
self.state = 'ready' | |
def set_target_temperature(self, value): | |
print("Set target temperature to", value) | |
self.kettle.set_target_temperature(value) | |
ws_message_queue.appendleft(self.kettle) | |
def set_boil_before_target(self, value): | |
print("Set boil before target switch to", value) | |
self.kettle.set_boil_before_target(value == "ON") | |
ws_message_queue.appendleft(self.kettle) | |
def set_keep_warm_enabled(self, value): | |
print("Set keep warm switch to", value) | |
self.kettle.set_keep_warm_enabled(value == "ON") | |
ws_message_queue.appendleft(self.kettle) | |
def set_keep_warm_set_time(self, value): | |
print("Set keep warm set time to", value) | |
self.kettle.set_keep_warm_set_time(value) | |
ws_message_queue.appendleft(self.kettle) | |
def start_stop_boiling(self, value): | |
if (value == "ON"): | |
print("Starting boiling") | |
self.kettle.start() | |
else: | |
print("Stopping boiling") | |
self.kettle.stop() | |
ws_message_queue.appendleft(self.kettle) | |
#loop.create_task(ws_client.update(self.kettle)) | |
async def on_new_status(kettle: Kettle): | |
global homie_kettle | |
if not (homie_kettle is None): | |
homie_kettle.update_properties(kettle) | |
async def on_connection(): | |
global homie_kettle | |
# Get the first device from my HomeWizard Kitchen Account | |
devices = await ws_client.get_devices() | |
kettle_id = devices.get("devices", [])[0].get("identifier") | |
# Subscribe for new status | |
kettle = await ws_client.subscribe_device(kettle_id, on_new_status) | |
homie_kettle = Device_Kettle(kettle = kettle, mqtt_settings = mqtt_settings) | |
async def send_queue(): | |
while True: | |
if len(ws_message_queue) > 0: | |
updated_kettle = ws_message_queue.pop() | |
print('Processing message', updated_kettle.to_json()) | |
await ws_client.update(updated_kettle) | |
print('Message processed') | |
#print('Sleeping') | |
await asyncio.sleep(0.5) | |
async def handler(): | |
connect_future = ws_client.connect(on_connection) | |
send_queue_handler = asyncio.ensure_future(send_queue()) | |
done, pending = await asyncio.wait([connect_future, send_queue_handler], | |
return_when=asyncio.ALL_COMPLETED, | |
) | |
for task in pending: | |
task.cancel() | |
loop.run_until_complete(handler()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment