Last active
February 21, 2024 06:49
-
-
Save kenzodeluxe/464f2b6b4f810420fabb2f0251b4e913 to your computer and use it in GitHub Desktop.
Read values from Landis & Gyr T230/330 / Qundis Qheat 5.5 heat meters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
""" | |
Script to read values from Qundis Qheat 5.5 (which seems to be "identical" to Landis&Gyr T230/T330) | |
It's necessary to send 210 leading zeros before the actual request | |
Check https://www.sipatec.rs/files/uploads/T230.pdf for further protcol details (and subsequent messages) | |
Uses https://github.com/ganehag/pyMeterBus to decode M-BUS datagrams | |
""" | |
import binascii | |
import datetime | |
import json | |
import logging | |
import sys | |
from os import getenv | |
from time import sleep | |
import serial | |
sys.path.append('/home/pi/pyMeterBus') | |
import meterbus | |
from influxdb import InfluxDBClient | |
error_sleep_interval = 10 | |
loglevel = getenv('LOGLEVEL', 'INFO').upper() | |
# InfluxDB | |
send_influx = bool(getenv('SEND_INFLUX', True)) | |
dbclient = InfluxDBClient(getenv('INFLUX_HOST'), | |
getenv('INFLUX_PORT', 8086), | |
getenv('INFLUX_USER', ''), | |
getenv('INFLUX_PASS', ''), | |
getenv('INFLUX_DB', 'wmz')) | |
json_body = [] | |
receiveTime = datetime.datetime.utcnow() | |
lookup_map = { 2: 'VIFUnit.ENERGY_WH', | |
3: 'VIFUnit.VOLUME'} | |
# Serial | |
serial_port = getenv('SERIAL_PORT', '/dev/ttyUSB0') | |
ser = serial.Serial(serial_port, baudrate=2400, bytesize=8, parity="E", stopbits=1, timeout=2, xonxoff=0, rtscts=0) | |
def write_zeros(): | |
i = 210 | |
while i: | |
ser.write(b'\x00') | |
i = i - 1 | |
def send_status_request(): | |
write_zeros() | |
ser.write(b'\x68\05\x05\x68\x53\xFE\x51\x0F\x0F\xC0\x16') | |
# see link from above to understand the actual values | |
try: | |
frame = meterbus.load(meterbus.recv_frame(ser)) | |
logging.debug(json.dumps(json.loads(frame.body.to_JSON()), indent=4, sort_keys=True)) # there might be an easier way | |
return True | |
except Exception as e: | |
logging.error(f'Unable to decode datagram, please retry: {e}') | |
return False | |
def get_data(): | |
write_zeros() | |
ser.write(b'\x10\x7b\xFE\x79\x16') | |
try: | |
frame = meterbus.load(meterbus.recv_frame(ser)) | |
json_response = json.loads(frame.body.to_JSON()) # there might be an easier way | |
return json_response | |
except Exception as e: | |
logging.error(f'Unable to parse data from WMZ: {e}') | |
return False | |
def main(): | |
retries = getenv('RETRIES', 5) | |
while retries: | |
success = send_status_request() | |
if not success: | |
logging.error('WMZ did not correctly respond to status request. Retrying.') | |
sleep(error_sleep_interval) | |
retries -= 1 | |
continue | |
wmz_data = get_data() | |
if wmz_data: | |
logging.debug(wmz_data) | |
# correct datagram should show this as its first entry (Qheat 5.5) | |
if wmz_data['records'][0]['type'] != 'VIFUnit.ACTUALITY_DURATION': | |
logging.error('Incorrect datagram, retrying.') | |
sleep(error_sleep_interval) | |
continue | |
else: | |
for key in lookup_map.keys(): | |
influx_entry = { "measurement": "", "time": receiveTime, "fields": { "value": "" }} | |
try: | |
influx_entry['measurement'] = lookup_map[key].strip() | |
influx_entry['fields']['value'] = wmz_data['records'][key]['value']/1000 | |
json_body.append(influx_entry) | |
except Exception as e: | |
logging.error(f'Unable to assign values from WMZ data to InfluxDB request data: {e}') | |
sleep(error_sleep_interval) | |
continue | |
if send_influx: | |
dbclient.write_points(json_body) | |
logging.debug(json_body) | |
print('Done.') | |
return | |
else: | |
retries -= 1 | |
sleep(error_sleep_interval) | |
continue | |
logging.error(f'Did not get a meaningful response within {retries} retries. Please re-run later.') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @kenzodeluxe,
I tried to download Landis+Gyr UltraAssist, but unfortunately an account is needed which I don't have.
Yes, the reader was working fine with my powermeter.