Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Python script to decrypt Kamstrup KEM files
# A Python script that decrypts the content of Kamstrup KEM file and imports meter files
# to wmbusmeters' config folder.
#
# The KEM file is a (sometimes zipped) xml file that contains xml-encrypted data using
# the xml-enc standard (http://www.w3.org/2001/04/xmlenc). The password needed to decrypt
# the xml-encrypted data can either be the CustomerId or something else selected by the
# person that has created the KEM file using Kamstrup software.
#
# This script takes the encrypted KEM file and decrypts its content (it automaticly detects
# the zip archive and extracts the kem file from it). The result is a XML with a list of meters
# with their types, serial numbers, keys, etc. The script prints the information about each meter
# to the console and populates wmbusmeters' config folder with corresponding meter files.
# Optionally, decrypted KEM file content can be saved to a file (option -o).
#
# Usage: python kem-import.py [options] <kem_file> <password>
# kem_file ... the name of the KEM file to decrypt or a zip archive having the KEM file in it
# password ... original password used to encrypt the content (16 characters maximum)
# options ... use -h switch to get a list of options with their descriptions
#
# The script is Python2/Python3 compatible.
#
from __future__ import print_function
import os
import sys
import re
import errno
import argparse
import base64
import Crypto.Cipher.AES as AES
from xml.dom import minidom
import zipfile
help_prog = "Decrypts Kamstrup KEM file and imports meter information into wmbusmeters' config folder."
help_epilog = ""
help_kem = "The name of the KEM file to decrypt or a name of a zip archive that contains the encrypted KEM file."
help_pwd = "The original password used to encrypt the KEM file content (16 characters maximum)."
help_cfg = """Location of config files for wmbusmeters (default location is the current working directory).
This option has the same meaning as --useconfig option of the wmbusmeters daemon, i.e. --useconfig=/ will
populate /etc/wmbusmeters.d folder (must be run with sudo) and --useconfig=. (the default) populates
./etc/wmbusmeters.d folder. If the destination folder does not exist, it will be created; existing meter
files will be overwritten."""
help_dry = "No meter files will be created, only the info will be printed on the console."
help_out = "Save the decrypted KEM file content into a given file."
# define command line arguments
argparser = argparse.ArgumentParser(description=help_prog, epilog=help_epilog)
argparser.add_argument('kem_file', help=help_kem)
argparser.add_argument('password', help=help_pwd)
argparser.add_argument("-c", "--useconfig", type=str, action='store', dest='config', default=os.getcwd(), help=help_cfg)
argparser.add_argument("-n", "--dryrun", action='store_true', dest='dryrun', help=help_dry)
argparser.add_argument("-o", "--output", type=str, action='store', dest='output', help=help_out)
# parse command line arguments
args = argparser.parse_args()
# adjust the config folder location to full target path
args.config = args.config.strip(os.path.sep) + '/etc/wmbusmeters.d/'
# test if input file exists
if (not os.path.isfile(args.kem_file)):
print('ERROR: The input KEM file does not exist.')
sys.exit(1)
#end if
kem_file_content = None
# test if input file is a zip file
# yes: find a file with .kem extension in the zip and extract content of that file
# no : load the content of the input file directly
if (zipfile.is_zipfile(args.kem_file)):
print("Detected a zip file on input ... extracting")
with zipfile.ZipFile(args.kem_file,'r') as zipobj:
file_list = zipobj.namelist()
for file_name in file_list:
if file_name.endswith('.kem'):
kem_file_content = zipobj.read(file_name)
break
#end if
#end for
if (not kem_file_content):
print("ERROR: The zip file '%s' does not seem to contain any '.kem' file." % (args.kem_file))
sys.exit(1)
#end if
else:
# read content of the kem file
with open(args.kem_file,'r') as f:
kem_file_content = f.read()
#end if
# read and parse KEM file content and extract its encrypted part
# (KEM file is a a XML file formated according to http://www.w3.org/2001/04/xmlenc)
try:
xmldoc = minidom.parseString(kem_file_content)
encrypedtext = xmldoc.getElementsByTagName('CipherValue')[0].firstChild.nodeValue
encrypeddata = base64.b64decode(encrypedtext)
except:
print('ERROR: The KEM file does not seem to contain encrypted data.')
sys.exit(1)
# KEM file data is encrypted with AES-128-CBC cipher and decryption
# requires a 128bit key and 128bit initialization vector. In the case of KEM file,
# the initialization vector is the same as the key. If password length is less
# than 16 characters (128 bits), we pad the key with zeros up to its full length.
key = bytes(str(args.password).encode("utf-8"))
if (len(key) < 16): key += (16-len(key)) * b"\0"
# content decryption
aes = AES.new(key, AES.MODE_CBC, IV=key)
decryptedtext = aes.decrypt(encrypeddata)
# save decrypted XML file if requested
if (args.output):
f = open(args.output, 'w')
f.write(decryptedtext)
f.close()
#end if
# create the folder for meter files if it does not exists
if (not args.dryrun) and (not os.path.exists(args.config)):
print("Creating target folder:", args.config)
try:
os.makedirs(args.config)
except OSError as e:
if (e.errno in [errno.EPERM, errno.EACCES]):
print(e)
print("You may need to use 'sudo' to access the target config folder.")
sys.exit(1)
else:
raise
#end if
# parse the decrypted KEM file content and create corresponding meter files in
# wmbusmeters' config folder for each meter found in the XML file
trimmedText = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\xff]', '', decryptedtext.decode('utf-8'))
xmldoc = minidom.parseString(trimmedText)
for e in xmldoc.getElementsByTagName('Meter'):
# read information from source XML file
meterName = e.getElementsByTagName('MeterName')[0].firstChild.nodeValue
meterType = e.getElementsByTagName('ConsumptionType')[0].firstChild.nodeValue
meterNum = e.getElementsByTagName('MeterNo')[0].firstChild.nodeValue
meterSerial = e.getElementsByTagName('SerialNo')[0].firstChild.nodeValue
meterVendor = e.getElementsByTagName('VendorId')[0].firstChild.nodeValue
meterConfig = e.getElementsByTagName('ConfigNo')[0].firstChild.nodeValue
meterModel = e.getElementsByTagName('TypeNo')[0].firstChild.nodeValue
meterKey = e.getElementsByTagName('DEK')[0].firstChild.nodeValue
# meter model identification
# CONTRIBUTING NOTE: additional meter types supported by wmbusmeters can be put here
# if their identification in KEM file is known
if (meterName == 'MC302') and (meterModel.startswith('302T')):
wmbusmeters_driver = 'multical302'
elif (meterName == 'MC403') and (meterModel.startswith('403')):
wmbusmeters_driver = 'multical403'
elif (meterName == 'MC21') and (meterModel.startswith('021')):
wmbusmeters_driver = 'multical21'
elif (meterName == 'MC603') and (meterModel.startswith('603')):
wmbusmeters_driver = 'multical603'
else:
wmbusmeters_driver = None
# print info to console
print('Found meter', meterName, '('+meterModel+')')
print(' number :', meterNum)
print(' serial :', meterSerial)
print(' type :', meterType)
print(' driver :', wmbusmeters_driver)
print(' config :', meterConfig)
print(' key :', meterKey)
if (not args.dryrun):
# save meter file in config folder if meter is supported by wmbusmeters
if (wmbusmeters_driver is not None):
try:
f = open(args.config+meterSerial, 'w')
f.write("name=%s\n" % (meterNum))
f.write("type=%s\n" % (wmbusmeters_driver))
f.write("id=%s\n" % (meterSerial))
f.write("key=%s\n" % (meterKey))
f.close()
print(' meter file:', args.config+meterSerial)
except (IOError, OSError) as e:
if (e.errno in [errno.EPERM, errno.EACCES]):
print(e)
print("You may need to use 'sudo' to access the target config folder.")
sys.exit(1)
else:
raise
else:
print(' !! unknow meter type, meter file has not been created')
#end if
#end for
@dkcsn

This comment has been minimized.

Copy link

@dkcsn dkcsn commented Feb 27, 2021

Hi from DK.
Thank you for developing this script :-)
I tried using this it - with the KEM file I recieved for my Kamstrup Multical21,
I unzipped as described and running the script I am getting

Traceback (most recent call last):
File "kem-decryptor.py", line 50, in
if (len(key) < 16): key += (16-len(key)) * "\0"
TypeError: can't concat str to bytes

any idea or is there is requirements to the password ? - the password I recieved is a 11 characters, a cityname plus some extra characters

@mbursa

This comment has been minimized.

Copy link
Owner Author

@mbursa mbursa commented Feb 27, 2021

The password must be provided to the script: python kem-import.py the_kem_file.kem password

KEM file data is encrypted with AES-128-CBC cipher and the decryption requires a 128bit key, i.e. the password should have 16 ASCII characters. If you password is shorter, the script will pad it with null characters to fill the 16 bytes requirement. So 11 chars of your password are just fine.

I have updated the script to include some recent bug fixes. The script has been included in wmbusmeters package that provides a great way to capture and log the radio data transmitted my Wireless-MBus devices.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment