Read values from Landis & Gyr T230/330 / Qundis Qheat 5.5 heat meters
#!/usr/bin/python3 | |
""" | |
Script to read values from Qundis Qheat 5.5 (which seems to be "identical" to Landis&Gyr T230/T330) | |
It's necessary to send 210 leading zeros before the actual request | |
Check https://www.sipatec.rs/files/uploads/T230.pdf for further protcol details (and subsequent messages) | |
Uses https://github.com/ganehag/pyMeterBus to decode M-BUS datagrams | |
""" | |
import binascii | |
import datetime | |
import json | |
import logging | |
import sys | |
from os import getenv | |
from time import sleep | |
import serial | |
sys.path.append('/home/pi/pyMeterBus') | |
import meterbus | |
from influxdb import InfluxDBClient | |
error_sleep_interval = 10 | |
loglevel = getenv('LOGLEVEL', 'INFO').upper() | |
# InfluxDB | |
send_influx = bool(getenv('SEND_INFLUX', True)) | |
dbclient = InfluxDBClient(getenv('INFLUX_HOST'), | |
getenv('INFLUX_PORT', 8086), | |
getenv('INFLUX_USER', ''), | |
getenv('INFLUX_PASS', ''), | |
getenv('INFLUX_DB', 'wmz')) | |
json_body = [] | |
receiveTime = datetime.datetime.utcnow() | |
lookup_map = { 2: 'VIFUnit.ENERGY_WH', | |
3: 'VIFUnit.VOLUME'} | |
# Serial | |
serial_port = getenv('SERIAL_PORT', '/dev/ttyUSB0') | |
ser = serial.Serial(serial_port, baudrate=2400, bytesize=8, parity="E", stopbits=1, timeout=2, xonxoff=0, rtscts=0) | |
def write_zeros(): | |
i = 210 | |
while i: | |
ser.write(b'\x00') | |
i = i - 1 | |
def send_status_request(): | |
write_zeros() | |
ser.write(b'\x68\05\x05\x68\x53\xFE\x51\x0F\x0F\xC0\x16') | |
# see link from above to understand the actual values | |
try: | |
frame = meterbus.load(meterbus.recv_frame(ser)) | |
logging.debug(json.dumps(json.loads(frame.body.to_JSON()), indent=4, sort_keys=True)) # there might be an easier way | |
return True | |
except Exception as e: | |
logging.error(f'Unable to decode datagram, please retry: {e}') | |
return False | |
def get_data(): | |
write_zeros() | |
ser.write(b'\x10\x7b\xFE\x79\x16') | |
try: | |
frame = meterbus.load(meterbus.recv_frame(ser)) | |
json_response = json.loads(frame.body.to_JSON()) # there might be an easier way | |
return json_response | |
except Exception as e: | |
logging.error(f'Unable to parse data from WMZ: {e}') | |
return False | |
def main(): | |
retries = getenv('RETRIES', 5) | |
while retries: | |
success = send_status_request() | |
if not success: | |
logging.error('WMZ did not correctly respond to status request. Retrying.') | |
sleep(error_sleep_interval) | |
retries -= 1 | |
continue | |
wmz_data = get_data() | |
if wmz_data: | |
logging.debug(wmz_data) | |
# correct datagram should show this as its first entry (Qheat 5.5) | |
if wmz_data['records'][0]['type'] != 'VIFUnit.ACTUALITY_DURATION': | |
logging.error('Incorrect datagram, retrying.') | |
sleep(error_sleep_interval) | |
continue | |
else: | |
for key in lookup_map.keys(): | |
influx_entry = { "measurement": "", "time": receiveTime, "fields": { "value": "" }} | |
try: | |
influx_entry['measurement'] = lookup_map[key].strip() | |
influx_entry['fields']['value'] = wmz_data['records'][key]['value']/1000 | |
json_body.append(influx_entry) | |
except Exception as e: | |
logging.error(f'Unable to assign values from WMZ data to InfluxDB request data: {e}') | |
sleep(error_sleep_interval) | |
continue | |
if send_influx: | |
dbclient.write_points(json_body) | |
logging.debug(json_body) | |
print('Done.') | |
return | |
else: | |
retries -= 1 | |
sleep(error_sleep_interval) | |
continue | |
logging.error(f'Did not get a meaningful response within {retries} retries. Please re-run later.') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment