Skip to content

Instantly share code, notes, and snippets.

@kenzodeluxe

kenzodeluxe/wmz.py

Last active Nov 17, 2020
Embed
What would you like to do?
Read values from Landis & Gyr T230/330 / Qundis Qheat 5.5 heat meters
#!/usr/bin/python3
"""
Script to read values from Qundis Qheat 5.5 (which seems to be "identical" to Landis&Gyr T230/T330)
It's necessary to send 210 leading zeros before the actual request
Check https://www.sipatec.rs/files/uploads/T230.pdf for further protcol details (and subsequent messages)
Uses https://github.com/ganehag/pyMeterBus to decode M-BUS datagrams
"""
import binascii
import datetime
import json
import logging
import sys
from os import getenv
from time import sleep
import serial
sys.path.append('/home/pi/pyMeterBus')
import meterbus
from influxdb import InfluxDBClient
error_sleep_interval = 10
loglevel = getenv('LOGLEVEL', 'INFO').upper()
# InfluxDB
send_influx = bool(getenv('SEND_INFLUX', True))
dbclient = InfluxDBClient(getenv('INFLUX_HOST'),
getenv('INFLUX_PORT', 8086),
getenv('INFLUX_USER', ''),
getenv('INFLUX_PASS', ''),
getenv('INFLUX_DB', 'wmz'))
json_body = []
receiveTime = datetime.datetime.utcnow()
lookup_map = { 2: 'VIFUnit.ENERGY_WH',
3: 'VIFUnit.VOLUME'}
# Serial
serial_port = getenv('SERIAL_PORT', '/dev/ttyUSB0')
ser = serial.Serial(serial_port, baudrate=2400, bytesize=8, parity="E", stopbits=1, timeout=2, xonxoff=0, rtscts=0)
def write_zeros():
i = 210
while i:
ser.write(b'\x00')
i = i - 1
def send_status_request():
write_zeros()
ser.write(b'\x68\05\x05\x68\x53\xFE\x51\x0F\x0F\xC0\x16')
# see link from above to understand the actual values
try:
frame = meterbus.load(meterbus.recv_frame(ser))
logging.debug(json.dumps(json.loads(frame.body.to_JSON()), indent=4, sort_keys=True)) # there might be an easier way
return True
except Exception as e:
logging.error(f'Unable to decode datagram, please retry: {e}')
return False
def get_data():
write_zeros()
ser.write(b'\x10\x7b\xFE\x79\x16')
try:
frame = meterbus.load(meterbus.recv_frame(ser))
json_response = json.loads(frame.body.to_JSON()) # there might be an easier way
return json_response
except Exception as e:
logging.error(f'Unable to parse data from WMZ: {e}')
return False
def main():
retries = getenv('RETRIES', 5)
while retries:
success = send_status_request()
if not success:
logging.error('WMZ did not correctly respond to status request. Retrying.')
sleep(error_sleep_interval)
retries -= 1
continue
wmz_data = get_data()
if wmz_data:
logging.debug(wmz_data)
# correct datagram should show this as its first entry (Qheat 5.5)
if wmz_data['records'][0]['type'] != 'VIFUnit.ACTUALITY_DURATION':
logging.error('Incorrect datagram, retrying.')
sleep(error_sleep_interval)
continue
else:
for key in lookup_map.keys():
influx_entry = { "measurement": "", "time": receiveTime, "fields": { "value": "" }}
try:
influx_entry['measurement'] = lookup_map[key].strip()
influx_entry['fields']['value'] = wmz_data['records'][key]['value']/1000
json_body.append(influx_entry)
except Exception as e:
logging.error(f'Unable to assign values from WMZ data to InfluxDB request data: {e}')
sleep(error_sleep_interval)
continue
if send_influx:
dbclient.write_points(json_body)
logging.debug(json_body)
print('Done.')
return
else:
retries -= 1
sleep(error_sleep_interval)
continue
logging.error(f'Did not get a meaningful response within {retries} retries. Please re-run later.')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment