Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Viessmann API to InfluxDB (using PyViCare)
# MIT License
#
# Copyright (c) 2022 Dennis Heitmann
import sys
import logging
import datetime
from PyViCare.PyViCare import PyViCare
from influxdb import InfluxDBClient
# Set up a client for InfluxDB
influxServer = '127.0.0.1'
influxPort = 8086
influxUser = 'root'
influxPass = 'password'
influxDatabase = 'database'
dbclient = InfluxDBClient(host=influxServer, port=influxPort, username=influxUser, password=influxPass, database=influxDatabase)
databasePrefix = "HOME/Viessmann/"
# Viessmann API
client_id = "xxxxx"
email = "xxxxx@example.com"
password = "xxxxx"
# Login to Viessmann
vicare = PyViCare()
vicare.initWithCredentials(email, password, client_id, "/tmp/token.save")
device = vicare.devices[0]
# Get all Parameters
deviceModel = device.getModel()
parameters = {}
if device.isOnline():
receiveTime = datetime.datetime.utcnow()
# Device
t = device.asAutoDetectDevice()
try:
parameters['solarYieldToday'] = float(t.getSolarPowerProductionToday())
except Exception:
pass
try:
parameters['hotWaterTemp'] = float(t.getDomesticHotWaterConfiguredTemperature())
except Exception:
pass
try:
parameters['hotWaterStorageTemp'] = float(t.getDomesticHotWaterStorageTemperature())
except Exception:
pass
try:
parameters['solarCollectorTemp'] = float(t.getSolarCollectorTemperature())
except Exception:
pass
try:
parameters['solarStorageTemp'] = float(t.getSolarStorageTemperature())
except Exception:
pass
try:
parameters['outsideTemp'] = float(t.getOutsideTemperature())
except Exception:
pass
try:
parameters['boilerTemp'] = float(t.getBoilerTemperature())
except Exception:
pass
# Heating
circuit = t.circuits[0] #select heating circuit
try:
parameters['supplyTemp'] = float(circuit.getSupplyTemperature())
except Exception:
pass
try:
parameters['boilerNiveau'] = float(circuit.getHeatingCurveShift())
except Exception:
pass
try:
parameters['boilerSteigung'] = float(circuit.getHeatingCurveSlope())
except Exception:
pass
try:
parameters['activeProgram'] = circuit.getActiveProgram()
except Exception:
pass
try:
parameters['desiredTemp'] = float(circuit.getCurrentDesiredTemperature())
except Exception:
pass
try:
parameters['activeMode'] = circuit.getActiveMode()
except Exception:
pass
# Burner
try:
burner = t.getBurner(0) #select burner
if burner.getActive():
parameters['burnerActive'] = 1
else:
parameters['burnerActive'] = 0
except Exception:
pass
try:
parameters['burnerHours'] = float(burner.getHours())
except Exception:
pass
try:
parameters['burnerStarts'] = burner.getStarts()
except Exception:
pass
def saveToDatabase():
for key in parameters:
json_body = [
{
"measurement": databasePrefix + key,
"time": str(receiveTime),
"fields": {
"value": parameters[key]
}
}
]
#print(json_body)
dbclient.write_points(json_body)
if __name__ == "__main__":
saveToDatabase()
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment