Skip to content

Instantly share code, notes, and snippets.

@dirkjanfaber
Created July 16, 2021 14:38
Show Gist options
  • Save dirkjanfaber/4fdecc859b1dab1377cb096ad5bf06f5 to your computer and use it in GitHub Desktop.
Save dirkjanfaber/4fdecc859b1dab1377cb096ad5bf06f5 to your computer and use it in GitHub Desktop.
bms-check.py
#!/usr/bin/env python3
import sys
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true',
help='increase output verbosity' )
parser.add_argument('-d', '--dry-run', action='store_true',
help='work without input' )
args = parser.parse_args()
# Configuration
mandatory_can = {
'351': {'dlc': {'min': 8}},
'355': {'dlc': {'min': 4}},
'356': {'dlc': {'min': 6}},
'35A': {'dlc': {'min': 8}},
'35E': {'dlc': {'min': 2}},
'35F': {'dlc': {'min': 6}}
}
optional_can = {
'370': {'dlc': {'min': 8}},
'371': {'dlc': {'min': 8}},
'372': {'dlc': {'min': 8}},
'373': {'dlc': {'min': 2}},
'374': {'dlc': {'min': 2}},
'375': {'dlc': {'min': 2}},
'376': {'dlc': {'min': 2}},
'377': {'dlc': {'min': 2}},
'378': {'dlc': {'min': 8}},
'379': {'dlc': {'min': 2}},
'380': {'dlc': {'min': 8}},
'381': {'dlc': {'min': 8}}
}
caninfo = {}
info = {
'Charge voltage limit (CVL)':
{'id': '351', 'type': 'un16', 'offset': 0, 'unit': 'V', 'scaling': 0.1},
'Max charge current (CCL)':
{'id': '351', 'type': 'sn16', 'offset': 2, 'unit': 'A', 'scaling': 0.1},
'Max discharge current (DCL)':
{'id': '351', 'type': 'sn16', 'offset': 4, 'unit': 'A', 'scaling': 0.1},
'Discharge voltage':
{'id': '351', 'type': 'un16', 'offset': 6, 'unit': 'V', 'scaling': 0.1},
'SOC value':
{'id': '355', 'type': 'un16', 'offset': 0, 'unit': '%', 'scaling': 1},
'SOH value':
{'id': '355', 'type': 'un16', 'offset': 2, 'unit': '%', 'scaling': 1},
'High res SOC':
{'id': '355', 'type': 'un16', 'offset': 4, 'unit': '%', 'scaling': 1, 'optional': 1},
'Battery voltage':
{'id': '356', 'type': 'sn16', 'offset': 0, 'unit': 'V', 'scaling': 0.01},
'Battery current':
{'id': '356', 'type': 'sn16', 'offset': 2, 'unit': 'A', 'scaling': 0.1},
'Battery temperature':
{'id': '356', 'type': 'sn16', 'offset': 4, 'unit': 'C', 'scaling': 0.1},
'Manufacturer':
{'id': '35E', 'type': 'string'},
}
def check_can_messages(template, error="ERROR"):
for m in template:
# Check if the messages has been found
if caninfo.get(m, None):
# The message is there, now check the length
if caninfo[m].get('dlc') >= template[m]['dlc']['min']:
template[m]['state'] = 'OK'
else:
template[m]['state'] = 'DLC too short'
else:
template[m]['state'] = error
def print_can_messages(template):
for m in template:
print("- `0x%s`, dlc: %d: %s" % (
m,
template[m]['dlc']['min'],
template[m].get('state', '________ ')))
def print_mandatory_can_messages():
print("""## Mandatory CAN messages
The following messages need to be available on the CAN bus. The mentioned
DLC (Data Length Code) is the _minimum_ needed length.
""")
print_can_messages(mandatory_can)
print()
def print_optional_can_messages():
print("""## Mandatory CAN messages
The following messages are optional. If the message is on the bus, but
has the wrong minimum DLC we consider this an error. If the message is not
on the bus, this is considered a warning.
""")
print_can_messages(optional_can)
print()
def print_caninfo():
for c in caninfo:
print("(%0.2f) %s [%d] %s" % (
caninfo[c].get('frequency', 0),
c,
caninfo[c].get('dlc', 0),
'-'.join(caninfo[c].get('message', []))
))
def check_can_information():
for i in info:
m = caninfo[info[i]['id']]['message']
# Optional messages should only be checked if the dlc is long enough
if info[i].get('optional') and \
caninfo[info[i]['id']]['dlc'] >= info[i].get('offset', 0):
info[i]['value'] = 'Optional, not found'
continue
# Check if the dlc allows us to find the compulsary messages..
if caninfo[info[i]['id']]['dlc'] < info[i].get('offset', 0):
info[i]['value'] = 'ERROR'
continue
if info[i].get('type') == 'string':
info[i]['value'] = ''.join([chr(int(c, 16)) for c in m])
elif info[i].get('type') == 'un16':
info[i]['value'] = \
int(''.join(m[info[i]['offset']:(info[i]['offset']+2)][::-1]), 16) * \
info[i].get('scaling', 1)
elif info[i].get('type') == 'sn16':
info[i]['value'] = \
int(''.join(m[info[i]['offset']:(info[i]['offset']+2)][::-1]), 16) * \
info[i].get('scaling', 1)
def print_information():
print("""
## BMS information
Values as advertised by the CAN bus of the BMS and decoded according
to the Victron Energy specifications:
""")
for i in info:
print("%s: %s %s " % (
i,
info[i].get('value', '__________'),
info[i].get('unit', '')
))
def print_header():
print("""
# CAN information
All of the information in this form is comming from a candumplog that
translates into:
```
""")
print_caninfo()
print("""
```
Where the first column contains the frequency of the messages, then
the CAN id, dlc and the raw message.
""")
if not args.dry_run:
# Read CAN messages from stdtin
for line in sys.stdin:
canid = line.rstrip().split()
if caninfo.get(canid[2], None):
# We seen this can id before
frequency = float(canid[0].replace('(', '').replace(')', '')) -\
caninfo[canid[2]].get('timestamp', 0)
caninfo[canid[2]]['frequency'] = frequency
else:
caninfo[canid[2]] = {
'message': canid[4:]
}
caninfo[canid[2]]['canid'] = canid[2]
caninfo[canid[2]]['timestamp'] = \
float(canid[0].replace('(', '').replace(')', ''))
caninfo[canid[2]]['dlc'] = int(canid[3][1:2])
check_can_messages(mandatory_can)
check_can_messages(optional_can, error="WARNING")
check_can_information()
print_header()
print_mandatory_can_messages()
print_optional_can_messages()
print_information()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment