-
-
Save kenzodeluxe/464f2b6b4f810420fabb2f0251b4e913 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3 | |
""" | |
Script to read values from Qundis Qheat 5.5 (which seems to be "identical" to Landis&Gyr T230/T330) | |
It's necessary to send 210 leading zeros before the actual request | |
Check https://www.sipatec.rs/files/uploads/T230.pdf for further protcol details (and subsequent messages) | |
Uses https://github.com/ganehag/pyMeterBus to decode M-BUS datagrams | |
""" | |
import binascii | |
import datetime | |
import json | |
import logging | |
import sys | |
from os import getenv | |
from time import sleep | |
import serial | |
sys.path.append('/home/pi/pyMeterBus') | |
import meterbus | |
from influxdb import InfluxDBClient | |
error_sleep_interval = 10 | |
loglevel = getenv('LOGLEVEL', 'INFO').upper() | |
# InfluxDB | |
send_influx = bool(getenv('SEND_INFLUX', True)) | |
dbclient = InfluxDBClient(getenv('INFLUX_HOST'), | |
getenv('INFLUX_PORT', 8086), | |
getenv('INFLUX_USER', ''), | |
getenv('INFLUX_PASS', ''), | |
getenv('INFLUX_DB', 'wmz')) | |
json_body = [] | |
receiveTime = datetime.datetime.utcnow() | |
lookup_map = { 2: 'VIFUnit.ENERGY_WH', | |
3: 'VIFUnit.VOLUME'} | |
# Serial | |
serial_port = getenv('SERIAL_PORT', '/dev/ttyUSB0') | |
ser = serial.Serial(serial_port, baudrate=2400, bytesize=8, parity="E", stopbits=1, timeout=2, xonxoff=0, rtscts=0) | |
def write_zeros(): | |
i = 210 | |
while i: | |
ser.write(b'\x00') | |
i = i - 1 | |
def send_status_request(): | |
write_zeros() | |
ser.write(b'\x68\05\x05\x68\x53\xFE\x51\x0F\x0F\xC0\x16') | |
# see link from above to understand the actual values | |
try: | |
frame = meterbus.load(meterbus.recv_frame(ser)) | |
logging.debug(json.dumps(json.loads(frame.body.to_JSON()), indent=4, sort_keys=True)) # there might be an easier way | |
return True | |
except Exception as e: | |
logging.error(f'Unable to decode datagram, please retry: {e}') | |
return False | |
def get_data(): | |
write_zeros() | |
ser.write(b'\x10\x7b\xFE\x79\x16') | |
try: | |
frame = meterbus.load(meterbus.recv_frame(ser)) | |
json_response = json.loads(frame.body.to_JSON()) # there might be an easier way | |
return json_response | |
except Exception as e: | |
logging.error(f'Unable to parse data from WMZ: {e}') | |
return False | |
def main(): | |
retries = getenv('RETRIES', 5) | |
while retries: | |
success = send_status_request() | |
if not success: | |
logging.error('WMZ did not correctly respond to status request. Retrying.') | |
sleep(error_sleep_interval) | |
retries -= 1 | |
continue | |
wmz_data = get_data() | |
if wmz_data: | |
logging.debug(wmz_data) | |
# correct datagram should show this as its first entry (Qheat 5.5) | |
if wmz_data['records'][0]['type'] != 'VIFUnit.ACTUALITY_DURATION': | |
logging.error('Incorrect datagram, retrying.') | |
sleep(error_sleep_interval) | |
continue | |
else: | |
for key in lookup_map.keys(): | |
influx_entry = { "measurement": "", "time": receiveTime, "fields": { "value": "" }} | |
try: | |
influx_entry['measurement'] = lookup_map[key].strip() | |
influx_entry['fields']['value'] = wmz_data['records'][key]['value']/1000 | |
json_body.append(influx_entry) | |
except Exception as e: | |
logging.error(f'Unable to assign values from WMZ data to InfluxDB request data: {e}') | |
sleep(error_sleep_interval) | |
continue | |
if send_influx: | |
dbclient.write_points(json_body) | |
logging.debug(json_body) | |
print('Done.') | |
return | |
else: | |
retries -= 1 | |
sleep(error_sleep_interval) | |
continue | |
logging.error(f'Did not get a meaningful response within {retries} retries. Please re-run later.') | |
if __name__ == "__main__": | |
main() |
Hi @kenzodeluxe
I have a Qundis Heat 5.5 and am struggling reading the values with your script.
Depending on the position of my reader, the error is either:
- ERROR:root:Unable to decode datagram, please retry: ('empty frame', False)
or:
- ERROR:root:Unable to decode datagram, please retry: ('empty frame', None)
Any Ideas what could be wrong?
Which reader do you use?
I'm using this reader (https://www.amazon.de/gp/product/B01B8N0ASY) connected to a Pi 3 B+.
Thank you and regards,
Daniel
Hi Daniel,
Any Ideas what could be wrong? Which reader do you use?
I use some no-name reader I got off of eBay, it's recognized as Future Technology Devices International, Ltd Bridge(I2C/SPI/UART/FIFO)
. Did you give the official Landis&Gyr software a try yet? Or did you try that reader on any other device, like a power meter, for comparison?
Hi @kenzodeluxe,
I tried to download Landis+Gyr UltraAssist, but unfortunately an account is needed which I don't have.
Yes, the reader was working fine with my powermeter.
I am almost certain I used Landis&Gyr UltraAssist to connect to the device, and inspected that traffic (communication on ttyUSB*) to find out what was needed for my device. At the time, I did not find any script to perform successful readings from that specific heat meter.
I run it on a daily basis, and the device will keep on/active for a few minutes after the successful read; however, since I own it (I use it for my heat pump), depleting the battery isn't a huge concern for me (I know I'll have to replace it eventually).