Created
January 27, 2023 05:28
-
-
Save bmxp/4c7008c1fa90fc00b5c87de2449a1f22 to your computer and use it in GitHub Desktop.
MBus Wasserzähler auslesen
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import serial | |
import meterbus | |
import time | |
from threading import Lock | |
address = 254 | |
serialport = '/dev/ttyUSB0' | |
baudrate = 2400 | |
frame = None | |
by = trigger.get('by', '') | |
source = trigger.get('source', '') | |
dest = trigger.get('dest', '') | |
value = trigger.get('value', '') | |
time_now = shtime.now().time() | |
logger.debug('Triggered by {}, source {}, value {}, dest {}'.format(by,source,dest,value)) | |
if not hasattr(logics, 'WasserzaehlerLock'): | |
logics.WasserzaehlerLock = Lock() | |
with logics.WasserzaehlerLock: | |
t0 = time.time() | |
with serial.Serial(serialport, baudrate, 8, 'E', 1, 0.5) as ser: | |
meterbus.send_ping_frame(ser, address) | |
frame = meterbus.load(meterbus.recv_frame(ser, 1)) | |
if not isinstance(frame, meterbus.TelegramACK): | |
raise LeaveLogic() | |
meterbus.send_request_frame(ser, address) | |
raw_data = meterbus.recv_frame(ser, meterbus.FRAME_DATA_LENGTH) | |
if raw_data is not None: | |
frame = meterbus.load(raw_data) | |
if not isinstance(frame, meterbus.TelegramLong): | |
raise LeaveLogic() | |
else: | |
raise LeaveLogic() | |
t1 = time.time() | |
logger.debug(f"query took {t1-t0} seconds") | |
if frame is not None: | |
sh.Wasser.Zaehler.Readout(frame.to_JSON()) | |
current = round(frame.records[0].value * 1000,1) | |
sh.Wasser.Zaehler(current) | |
logger.debug(f"Setze neuen Zählerwert auf {current} l / {frame.records[0].value:.4f} m³") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment