Skip to content

Instantly share code, notes, and snippets.

@HNJAMeindersma
Created February 20, 2024 13:34
Show Gist options
  • Save HNJAMeindersma/0c9a8dc9f61e3a81c2c36cc4756bafb2 to your computer and use it in GitHub Desktop.
Save HNJAMeindersma/0c9a8dc9f61e3a81c2c36cc4756bafb2 to your computer and use it in GitHub Desktop.
Script that reads serial data from a DSMR P1 port with a FTDI USB cable and post it via MQTT
#!/usr/bin/env python2
# Import required libraries
import datetime
import collections
import re
import sys
import serial
import crcmod.predefined
import json
import paho.mqtt.client as mqtt
# ============================================================================ #
# Debugging settings
production = True
debugging = 1
# MQTT settings
json_minify = True
mqtt_server = 'mqtt.mynetwork.lan'
mqtt_port = 1883
mqtt_topic = 'dsmr/stats/json'
# DSMR timestamp format
timestamp_input_format = '%y%m%d%H%M%S'
timestamp_output_format = '%Y-%m-%dT%H:%M:%S'
# OBIS codes
obis_codes = {
# General
'1-3:0.2.8' : 'dsmr_version',
'0-1:24.1.0' : 'mbus_clients',
# Electricity
'0-0:96.1.1' : 'serial_number',
'0-0:1.0.0' : 'timestamp',
'0-0:96.14.0' : 'tariff',
'0-0:96.7.21' : 'failures_short',
'0-0:96.7.9' : 'failures_long',
'1-0:99.97.0' : 'failures_log',
'1-0:32.32.0' : 'voltage_sags_l1',
'1-0:52.32.0' : 'voltage_sags_l2',
'1-0:72:32.0' : 'voltage_sags_l3',
'1-0:32.36.0' : 'voltage_swells_l1',
'1-0:52.36.0' : 'voltage_swells_l2',
'1-0:72.36.0' : 'voltage_swells_l3',
'1-0:32.7.0' : 'voltage_l1',
'1-0:52.7.0' : 'voltage_l2',
'1-0:72.7.0' : 'voltage_l3',
'1-0:31.7.0' : 'current_l1',
'1-0:51.7.0' : 'current_l2',
'1-0:71.7.0' : 'current_l3',
'1-0:1.7.0' : 'power_receiving',
'1-0:21.7.0' : 'power_receiving_l1',
'1-0:41.7.0' : 'power_receiving_l2',
'1-0:61.7.0' : 'power_receiving_l3',
'1-0:2.7.0' : 'power_delivery',
'1-0:22.7.0' : 'power_delivery_l1',
'1-0:42.7.0' : 'power_delivery_l2',
'1-0:62.7.0' : 'power_delivery_l3',
'1-0:1.8.1' : 'total_received_t1',
'1-0:1.8.2' : 'total_received_t2',
'1-0:2.8.1' : 'total_delivered_t1',
'1-0:2.8.2' : 'total_delivered_t2',
# Gas
'0-1:96.1.0' : 'serial_number',
'0-1:24.2.1' : 'total_received'
}
# ============================================================================ #
# Calculate value lenghts
max_len = max(map(len,obis_codes.values()))
# The true telegram ends with an exclamation mark after a CR/LF
pattern = re.compile(b'\n(?=!)')
# According to the DSMR spec, we need to check a CRC16
crc16 = crcmod.predefined.mkPredefinedCrcFun('crc16')
# Create an empty telegram
telegram = ''
checksum_found = False
good_checksum = False
mqtt_last = datetime.datetime.now()
# Initialize source
if production:
ser = serial.Serial()
ser.baudrate = 115200
ser.bytesize = serial.EIGHTBITS
ser.parity = serial.PARITY_NONE
ser.stopbits = serial.STOPBITS_ONE
ser.xonxoff = 1
ser.rtscts = 0
ser.timeout = 12
ser.port = "/dev/ttyUSB0"
else:
print("Running in test mode!")
ser = open("example.txt", 'rb')
# MQTT publish callback
def on_publish(client, userdata, result):
global mqtt_last
mqtt_last = datetime.datetime.now()
if debugging >= 1:
print("Successfully published JSON to MQTT!")
pass
# Connect MQTT
mqttClient = mqtt.Client()
mqttClient.on_publish = on_publish
mqttClient.connect(mqtt_server, mqtt_port, 60)
mqttClient.loop_start()
# Forever loop
while True:
# Read in all the lines until we find the checksum (line starting with an exclamation mark)
try:
# Open serial connection
if production:
try:
ser.open()
except Exception as ex:
template = "An exception of type {0} occured. Arguments:\n{1!r}"
message = template.format(type(ex).__name__, ex.args)
print message
sys.exit("Error opening connection to serial: %s. Stopping!" % ser.name)
# (Re)set loop variables
telegram = ''
checksum_found = False
good_checksum = False
# Read serial until checksum is found
while not checksum_found:
# Read incoming line
telegram_line = ser.readline()
if debugging >= 4:
print("Incoming telegram line: %s") % telegram_line.decode('ascii').strip()
# Check if it matches the checksum line (! at start)
if re.match(b'(?=!)', telegram_line):
telegram = telegram + telegram_line
checksum_found = True
if debugging >= 1:
print('Found telegram checksum line')
else:
telegram = telegram + telegram_line
# Catch read error
except Exception as ex:
template = "An exception of type {0} occured. Arguments:\n{1!r}"
message = template.format(type(ex).__name__, ex.args)
print message
print("There was a problem %s, continuing...") % ex
# Close serial port
if production:
try:
ser.close()
except:
sys.exit("There was a problem %s, exiting..." % ser.name)
# Look for the checksum in the telegram
for m in pattern.finditer(telegram):
# Remove the exclamation mark from the checksum, and make an integer out of it
given_checksum = int('0x' + telegram[m.end() + 1:].decode('ascii'), 16)
# The exclamation mark is also part of the text to be CRC16'd
calculated_checksum = crc16(telegram[:m.end() + 1])
if given_checksum == calculated_checksum:
good_checksum = True
if debugging >= 1:
print("Telegram checksum: %s") % given_checksum
print("Calculated checksum: %s") % calculated_checksum
# Store the values in a dictionary if the checksum is good
if good_checksum:
telegram_values = dict()
if debugging >= 1:
print("Good checksum from telegram")
# Split the telegram into lines and iterate over them
for telegram_line in telegram.split(b'\r\n'):
# Split the OBIS code from the value, the lines with a OBIS code start with a number
if re.match(b'\d', telegram_line):
# The values are enclosed with parenthesis. Find the location of the first opening parenthesis, and store all split lines.
if debugging >= 3:
print(telegram_line)
if debugging >= 4:
print re.split(b'(\()', telegram_line)
# You can't put a list in a dict TODO better solution
code = ''.join(re.split(b'(\()', telegram_line)[:1])
value = ''.join(re.split(b'(\()', telegram_line)[1:])
telegram_values[code] = value
# Process telegram
if debugging >= 1:
print("Processing telegram..."),
mqtt_values = dict()
mqtt_values_electricity = dict()
mqtt_values_electricity_log = dict()
mqtt_values_gas = dict()
for code, value in telegram_values.items():
if code in obis_codes:
# DSMR version
if '1-3:0.2.8' in code:
# Strip value
value = value.lstrip(b'\(').rstrip(b'\)')
# Process value
try:
value = float(value)
value = (value / 10)
value = str(value)
except:
value = str(value)
mqtt_values[obis_codes[code]] = value
# M-Bus clients
elif '0-1:24.1.0' in code:
# Strip value
value = value.lstrip(b'\(').rstrip(b'\)')
# Process value
try:
value = float(value)
except:
value = str(value)
mqtt_values[obis_codes[code]] = value
# Gas usage data
elif '0-1:24.2.1' in code:
# Split time & value
(time, value) = re.findall('\((.*?)\)', value)
# Process time
time = str(time.lstrip(b'\(').rstrip(b'\)WS'))
try:
time = str(datetime.datetime.strptime(time, timestamp_input_format).strftime(timestamp_output_format))
except:
time = str(datetime.datetime.now().strftime(timestamp_output_format))
mqtt_values_gas['timestamp'] = time
# Process value
value = float(value.lstrip(b'\(').rstrip(b'\)*m3'))
mqtt_values_gas[obis_codes[code]] = value
# Gas equipment identifier
elif '0-1:96.1.0' in code:
# Strip value
value = value.lstrip(b'\(').rstrip(b'\)')
# Process value
value = str(value)
mqtt_values_gas[obis_codes[code]] = value
# Electricity equipment identifier
elif '0-0:96.1.1' in code:
# Strip value
value = value.lstrip(b'\(').rstrip(b'\)')
# Process value
value = str(value)
mqtt_values_electricity[obis_codes[code]] = value
# Power failure log
elif '1-0:99.97.0' in code:
# Split values
value = re.findall('\((.*?)\)', value)
# Process value
pair = ''
fail = False
list = dict()
for idx, item in enumerate(value):
if idx >= 2:
# Process timestamp
if (idx % 2) == 0:
try:
pair = str(datetime.datetime.strptime(item.lstrip(b'\(').rstrip(b'\)WS'), timestamp_input_format).strftime(timestamp_output_format))
except:
fail = True
# Check if timestamp was succesfull
elif fail == True:
fail = False
pair = ''
continue
# Process duration
else:
item = float(item.lstrip(b'\(').rstrip(b'\)*s'))
list[pair] = item
pair = ''
# Map values
mqtt_values_electricity_log = list
# Electricity timestamp
elif '0-0:1.0.0' in code:
# Strip value
value = value.lstrip(b'\(').rstrip(b'\)WS')
# Process value
try:
value = str(datetime.datetime.strptime(value, timestamp_input_format).strftime(timestamp_output_format))
except:
value = str(datetime.datetime.now().strftime(timestamp_output_format))
mqtt_values_electricity[obis_codes[code]] = value
# Other data
else:
# Strip value
value = value.lstrip(b'\(').rstrip(b'\)*kWhAV')
# Process value
try:
value = float(value)
except:
value = str(value)
mqtt_values_electricity[obis_codes[code]] = value
# Telegram processed
if debugging >= 1:
print("COMPLETED")
# Combine lists
mqtt_values_electricity[obis_codes['1-0:99.97.0']] = mqtt_values_electricity_log
mqtt_values['electricity'] = mqtt_values_electricity
mqtt_values['gas'] = mqtt_values_gas
# Convert list to JSON
if json_minify == True:
mqtt_json = json.dumps(mqtt_values, separators = (',', ':'))
else :
mqtt_json = json.dumps(mqtt_values, indent = 4, separators = (',', ': '))
if debugging >= 2:
print("Composed JSON message:\r\n%s") % mqtt_json
# Send JSON over MQTT
if datetime.datetime.now() > (mqtt_last + datetime.timedelta(hours=0, minutes=0, seconds=10)):
if debugging >= 1:
print("Publishing JSON to MQTT")
mqttClient.publish(mqtt_topic, mqtt_json)
# Received bad checksum from telegram
else:
if debugging >= 1:
print("Bad checksum from telegram")
#!/bin/bash
sudo apt install python2 python-pip -y
python2 -m pip install --force-reinstall pyserial crcmod paho-mqtt
echo "Don't forget to configure a MQTT broker inside the script and create a system service / daemon to run it automatically!"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment