Skip to content

Instantly share code, notes, and snippets.

@bmxp
Created January 27, 2023 05:28
Show Gist options
  • Save bmxp/4c7008c1fa90fc00b5c87de2449a1f22 to your computer and use it in GitHub Desktop.
Save bmxp/4c7008c1fa90fc00b5c87de2449a1f22 to your computer and use it in GitHub Desktop.
MBus Wasserzähler auslesen
#!/usr/bin/env python3
import serial
import meterbus
import time
from threading import Lock
address = 254
serialport = '/dev/ttyUSB0'
baudrate = 2400
frame = None
by = trigger.get('by', '')
source = trigger.get('source', '')
dest = trigger.get('dest', '')
value = trigger.get('value', '')
time_now = shtime.now().time()
logger.debug('Triggered by {}, source {}, value {}, dest {}'.format(by,source,dest,value))
if not hasattr(logics, 'WasserzaehlerLock'):
logics.WasserzaehlerLock = Lock()
with logics.WasserzaehlerLock:
t0 = time.time()
with serial.Serial(serialport, baudrate, 8, 'E', 1, 0.5) as ser:
meterbus.send_ping_frame(ser, address)
frame = meterbus.load(meterbus.recv_frame(ser, 1))
if not isinstance(frame, meterbus.TelegramACK):
raise LeaveLogic()
meterbus.send_request_frame(ser, address)
raw_data = meterbus.recv_frame(ser, meterbus.FRAME_DATA_LENGTH)
if raw_data is not None:
frame = meterbus.load(raw_data)
if not isinstance(frame, meterbus.TelegramLong):
raise LeaveLogic()
else:
raise LeaveLogic()
t1 = time.time()
logger.debug(f"query took {t1-t0} seconds")
if frame is not None:
sh.Wasser.Zaehler.Readout(frame.to_JSON())
current = round(frame.records[0].value * 1000,1)
sh.Wasser.Zaehler(current)
logger.debug(f"Setze neuen Zählerwert auf {current} l / {frame.records[0].value:.4f} m³")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment