Skip to content

Instantly share code, notes, and snippets.

@poggenpower
Last active August 22, 2022 09:37
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save poggenpower/296d94ac36f8736aa03e8be5c064af63 to your computer and use it in GitHub Desktop.
Save poggenpower/296d94ac36f8736aa03e8be5c064af63 to your computer and use it in GitHub Desktop.
Viessmann to mqtt forwarder

Viessmann to MQTT forwarding

The files below is part of a larger framework and won't really run indpendent from it, but should be easy to convert into a standalone version.

External modules

Two external modules are requirend

  1. https://github.com/somm15/PyViCare for talking Viessmannn API
  2. https://github.com/poggenpower/mqtt_tree for wrapping MQTT communication (requires paho.mqtt.client) install via pip install git+https://github.com/poggenpower/mqtt_tree

If you create a standalone version, please give me a ping, I will add links here.

"""
Viess2Mqtt
Bridge between Viessman Cloud API and Mqtt.
Using 3rd party package PyViCare to talk to Viessman.
Publishes all reading/features to MQTT.
Filter a little bit of unnecessary information
Need Scheduler_Node to get triggered.
Fetch and publish runs in a seperate threat to unblock the main threated while longer data processing.
Keeps track of changes and updates MQTT only if changed.
TODO:
- Allow changing settings
- Consider to publish certain features "retained" for better integration
"""
import datetime
import http.client
import json
import logging
import os
import queue
import sys
from dataclasses import dataclass, field
import threading
from typing import Callable, Union
from contextlib import suppress
import dateutil.parser
import PyViCare.PyViCare
import PyViCare.PyViCareUtils
import requests
import urllib3
from PyViCare.PyViCare import PyViCare as PyViCareAPI
from mqtt_client import mqtt_client
sys.path.append('..')
# pylint: disable=wrong-import-position
from nodes import ActionNode
# pylint: enable=wrong-import-position
# pylint: disable=line-too-long
logger = logging.getLogger(__name__)
class NoneException(BaseException):
pass
@dataclass
class ViessDevice():
name: str
device_config: PyViCare.PyViCare.PyViCareDeviceConfig = field(repr=False, compare=False)
device_type: str
features: list[dict] = field(default_factory=list, repr=False, compare=False)
@dataclass
class ViessConf():
client_id: str = ""
email: str = ""
password: str = ""
wake_interval: int = 2
class Viess2Mqtt(ActionNode):
def __init__(self, publish_callback: Callable, private_callback: Callable=None, name: str=None, cryptokey: str=None):
super().__init__(publish_callback, private_callback=private_callback, name=name, cryptokey=cryptokey)
self.q:queue.Queue = queue.Queue()
self.name: str = name if name else __name__
Viess2MqttThread(self, daemon=True).start()
def input(self, msg):
"deleget to thread, to free main loop"
self.q.put(msg)
class Viess2MqttThread(threading.Thread):
primitive = (int, float, str, bool, type(None), bytearray)
basic_iterable = (dict, list, tuple)
basic_list = (list, tuple)
def __init__(self, viess2mqtt: Viess2Mqtt, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.viess2mqtt: Viess2Mqtt = viess2mqtt
self.q: queue.Queue = viess2mqtt.q
self.topic_prefix = viess2mqtt.name
self.get_prefix: str = f"{self.topic_prefix}/get/"
self.set_prefix: str = f"{self.topic_prefix}/set/"
self.viess_config = ViessConf(**viess2mqtt.conf)
self.vicare: PyViCareAPI = PyViCareAPI()
try:
self.vicare.initWithCredentials(
self.viess_config.email,
self.viess_config.password,
self.viess_config.client_id,
"token.tmp",
)
except (
PyViCare.PyViCareUtils.PyViCareInternalServerError,
http.client.RemoteDisconnected,
urllib3.exceptions.ProtocolError,
requests.exceptions.ConnectionError,
) as err:
logging.warning("Issue to communicate the the viessmann API servers, but will continue. %s", err)
self.devices: dict = {}
self.last_published: dict = {}
self.__get_devices()
if logging.getLogger().level == logging.DEBUG:
self.exception_switch = NoneException
logging.warning("Ignoring some Exceptions")
else:
self.exception_switch = Exception
def __mqtt_init(self) -> None:
self.viess2mqtt.wake_me_every("refresh", self.viess_config.wake_interval, seconds=True)
def __get_device(self, dev: PyViCare.PyViCare.PyViCareDeviceConfig) -> ViessDevice:
device_config: PyViCare.PyViCare.PyViCareDeviceConfig = dev
dev_type: str = dev.asAutoDetectDevice().__class__.__name__
conf = dev.getConfig()
model = dev.getModel()
name = f"{model}_{conf.id}"
return ViessDevice(name, device_config, dev_type)
def __get_devices(self) -> None:
for dev in self.vicare.devices:
dev = self.__get_device(dev)
self.devices[dev.name] = dev
def __refresh_data(self) -> None:
dev_connection_topic: str = "dev_status/connection"
dev_statusmsg_topic: str = "dev_status/msg"
dev_name:str = "NA"
dev_type:str = "NA"
try:
for dev in self.devices.values():
dev.features = dev.device_config.get_raw_json()['data']
dev_name = dev.name
dev_type = dev.device_type
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_statusmsg_topic), f"Device {dev_name} fetaures refreshed via API")
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_connection_topic), 1)
except PyViCare.PyViCareUtils.PyViCareInternalServerError as pcse:
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_connection_topic), 0)
if "DEVICE_COMMUNICATION_ERROR" in str(pcse):
msg = f"Connection issue to internal device. Can't do anything. {pcse}"
logging.debug(msg)
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_statusmsg_topic), msg)
elif "Internal server error" in str(pcse):
msg = f"(Temporary) Connection issue to viessmann cloud. Can't do anything. {pcse}"
logging.debug(msg)
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_statusmsg_topic), msg)
else:
msg = f"Conneciton issues, try to reauthenticate. But may have different reason. {pcse}"
logging.exception(msg)
self.vicare.oauth_manager.renewToken()
self.viess2mqtt._output(os.path.join(self.get_prefix, dev_type, dev_name, dev_statusmsg_topic), msg)
except ( urllib3.exceptions.ProtocolError, http.client.RemoteDisconnected, requests.exceptions.ConnectionError ) as e_conn:
logging.debug("Lowerlevel donnection issue to viessman cloud: %s", e_conn)
def __cleanup_feature(self, feature: dict) -> dict:
del feature['gatewayId']
del feature['apiVersion']
del feature['deviceId']
del feature['uri']
if len(feature.get('properties', "NA")) == 0: del feature['properties']
if len(feature.get('commands', "NA")) == 0: del feature['commands']
if len(feature.get('components', "NA")) == 0: del feature['components']
return feature
def __skip_feature(self, feature: dict) -> bool:
if feature.get('isEnabled') is False: return True
if len(feature.get('properties', "NA")) == 0 and len(feature.get('commands', "NA")) == 0 and len(feature.get('components', "NA")) == 0:
return True
return False
def __publish_feature(self, device_prefix: list[str], feature: dict) -> None:
if not feature.get('feature'):
logging.error("Feature not found in %s", feature)
return
if self.__skip_feature(feature):
return
feature_path: list[str] = feature['feature'].split(".")
feature_prefix = os.path.join(self.get_prefix, *device_prefix, *feature_path)
del feature['feature']
feature = self.__cleanup_feature(feature)
now: datetime.datetime = datetime.datetime.now(datetime.timezone.utc)
feature_timestamp: datetime.datetime = dateutil.parser.parse(feature['timestamp'])
feature_last_published: datetime.datetime = self.last_published.get(feature_prefix, datetime.datetime(1, 1, 1, 0, 0, 0, 0, datetime.timezone.utc))
feature['age'] = (now - feature_last_published).total_seconds()
if feature_last_published <= feature_timestamp:
for key, value in feature.items():
if self.is_basic_iterable(value):
value = json.dumps(value)
if self.is_primitive(value):
self.viess2mqtt._output(os.path.join(feature_prefix, key), value)
self.last_published[feature_prefix] = now
def __publish_device(self, device: ViessDevice) -> None:
for feature in device.features:
self.__publish_feature([device.device_type, device.name], feature)
def __publish_viess2mqtt(self) -> None:
for dev in self.devices.values():
self.__publish_device(dev)
def run(self):
while True:
with suppress(queue.Empty):
while True:
msg: mqtt_client.MQTTMessage = self.q.get(timeout=1)
self.input(msg)
def input(self, msg: mqtt_client.MQTTMessage) -> None:
try:
if msg.topic in [f"{self.viess2mqtt.get_own_topic()}init"]:
self.__mqtt_init()
elif msg.topic.startswith(self.viess2mqtt.scheduler_trigger_topic):
if msg.topic.endswith("refresh"):
self.__refresh_data()
self.__publish_viess2mqtt()
elif msg.topic.startswith(self.set_prefix):
pass
except self.exception_switch as exp:
logging.exception("Issue to process message: %s. Exceptio: %s", msg.topic, exp)
def is_primitive(self, thing):
return isinstance(thing, self.primitive)
def is_basic_iterable(self, thing):
return isinstance(thing, self.basic_iterable)
def is_basic_list(self, thing):
return isinstance(thing, self.basic_list)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment