Skip to content

Instantly share code, notes, and snippets.

@BenPru
Last active August 1, 2023 14:50
Show Gist options
  • Save BenPru/ec463ea835fb705b070939c793196f2e to your computer and use it in GitHub Desktop.
Save BenPru/ec463ea835fb705b070939c793196f2e to your computer and use it in GitHub Desktop.
Python Test Module for connecting luxtronik websocket
#!/usr/bin/env python
import threading
import time
import websocket
import xmltodict
ws_url = "ws://wp-novelan:8214"
subprotocols=["Lux_WS"]
login = "LOGIN;999999"
class lux_value():
def __init__(self, cat_id: str, cat: str, data: dict):
def is_float(num) -> bool:
try:
return not (float(num)).is_integer()
except:
return False
self.cat_id = cat_id
self.category = cat
self.id = data["@id"]
self.name = data["name"]
self.value = str(data["value"])
self.unit = None
if self.value == "Ein":
self.value = True
elif self.value == "Aus":
self.value = False
elif self.value.endswith("°C"):
self.value = float(self.value[:-2])
self.unit = "°C"
elif ' ' in self.value and ' (' not in self.value and not self.value.endswith(")"):
v = self.value.split(' ')
if is_float(v):
self.value = float(v[0])
self.unit = v[1]
def __repr__(self):
return f"{self.category}.{self.name} ({self.id}): {self.value} {self.unit}"
def on_open(wsapp):
wsapp.send(login)
def get_info_id(data: dict) -> str:
for item in data["Navigation"]["item"]:
if item["name"] == "Informationen":
return item["@id"]
return None
def on_message(wsapp, message):
data = xmltodict.parse(message)
if "Navigation" in data:
info_id = get_info_id(data)
if info_id is not None:
wsapp.send(f"GET;{info_id}")
return
elif "Content" in data:
info_id = data["Content"]["@id"]
values = list[lux_value]()
for cat_data in data["Content"]["item"]:
cat_id = cat_data["@id"]
cat = cat_data["name"]
items = cat_data["item"] if isinstance(cat_data["item"], list) else [cat_data["item"]]
for value in items:
try:
values.append(lux_value(cat_id, cat, value))
except Exception as err:
print(err)
print(values)
time.sleep(1)
# wsapp.send("REFRESH")
wsapp.send(f"GET;{info_id}")
return
wsapp = websocket.WebSocketApp(ws_url, subprotocols=subprotocols, on_message=on_message, on_open=on_open)
def close_ws():
time.sleep(30)
wsapp.close()
t2 = threading.Thread(target=close_ws, name="close_ws")
t2.start()
wsapp.run_forever()
@gerw
Copy link

gerw commented Jan 14, 2023

Is there any reason for sending again the "info_id" (in line 74) instead of the "REFRESH"?

@BenPru
Copy link
Author

BenPru commented Jan 15, 2023

Is there any reason for sending again the "info_id" (in line 74) instead of the "REFRESH"?

Good question. I have copied it from the original web interface and it worked for me.

@gerw
Copy link

gerw commented Jan 15, 2023

Ok. My web interface ("Softwarestand V3.85.6 (r6445)") uses "REFRESH" every second to update the displayed values.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment