Skip to content

Instantly share code, notes, and snippets.

@georg90
Last active February 8, 2024 18:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save georg90/c6822fa28261059e4c8361bdcff13f32 to your computer and use it in GitHub Desktop.
Save georg90/c6822fa28261059e4c8361bdcff13f32 to your computer and use it in GitHub Desktop.
Example of micropython code to read victron SmartShunt 500A
# Pico_Victron_BT.py
#
# © Swâmi Petaramesh 2024
#
# This program is free software: you can redistribute it and/or modify it under the terms
# of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>. 3
#
# This program is example code for receiving and decoding some Victron® devices information
# and statues over Bluetooth-LE on a Raspberry Pi® Pico W running :
#
# Works on : sys.version:3.4.0; MicroPython v1.22.1 on 2024-01-05
# RPI_PICO_W-20240105-v1.22.1.uf2
# Also confirmed on MicroPython v1.23.0-preview.91.g5a68e82d1 on 2024-02-07; Raspberry Pi Pico W with RP2040 (@georg90)
#
# You need to put your own devices MAC addresses and encryption keys in the code below.
# Lbraries =============================================================
#
# Micropython built-in libraries
import time, struct, bluetooth, cryptolib, sys
from machine import Pin
from cryptolib import aes
# Libraries to be installed on the Raspberry Pi Pico
# With mpremote tool (from Linux PC terminal) :
# mpremote mip install github:peterhinch/micropython-async/v3/primitives
# mpremote mip install github:peterhinch/micropython-async/v3/threadsafe
import asyncio
from threadsafe import ThreadSafeQueue
import binascii
# Initialisations ==========================================================
_DEBUG = const(3) # Extra debug console output
_UNKN = const(-9999) # N/A value
pin_led_bt = Pin("WL_GPIO0", Pin.OUT, value=1) # Integrated LED, ON
# Victron devices parameters and values -----------------------------------
# We need to put our Victron devices MAC addresses and encryption keys here
# The rest should be left _UNKN
# Get the values from Victron Connect app (Product details) and add '\x' every 2nd character to make this string byte
# bmv712 works for SmartShunt 500A to read values like Volt, Temp, SoC, Temp
victron = {
"bmv712" : {
"mac": b'\xc3\xb1\x20\xe2\x7b\x19',
"key": b'\x2d\x0a\x4d\x87\x1a\x34\xfe\x27\x7d\xb3\x4c\x7f\xb0\x24\x63\xc0',
"volt": _UNKN,
"amp": _UNKN,
"soc": _UNKN,
"temp": _UNKN,
"upd": _UNKN
}
}
maclist= [victron[d]["mac"] for d in victron]
# System parameters ------------------------------------------------
_TMR_MAIN_LOOP = const(500) # Main loop duration
_BT_SCAN_DURATION_MS = const(0) # BT Scan default duration, 0 = forever
_BT_SCAN_INTERVAL_US = const(2000000) # Scan every 2 sec (uS)
_BT_SCAN_WINDOW_US = const(100000) # Scan for 100 mS (uS)
_BT_MIN_RSSI = const(-85) # Minimum RSSI
ble = bluetooth.BLE()
# Fixed values
_BT_EXPIRE = const(180 * 1000) # BT values expired after stalled (mS)
# This is our main status that we use everywhere
STATUS = (
"CKSUM", # 0
"?????", # 1
"Unknown", # 2
"ERROR", # 3
"FAIL!", # 4
"DELTA", # 5
"Init", # 6
"Off", # 7
"Stop", # 8
"Start", # 9
"HI-V", # 10
"HI-v", # 11
"LOW-V", # 12
"LOW-v", # 13
"LOW-%", # 14
"HI-T°", # 15
"LO-T°", # 16
"Drain", # 17
"Timer", # 18
"Timr", # 19
"Stby", # 20
"LOW", # 21
"OK", # 22
"High", # 23
"Full", # 24
"Chrg", # 25
"Chg", # 26
"Engine", # 27
"Bulk", # 28
"Absorption", # 29
"Float", # 30
"Storage", # 31
"Remote", # 32
"Lock", # 33
"EngStop", # 34
"R+Stop", # 35
"RunTime", # 36
"FloTime", # 37
"Display", # 38
"Manual", # 39
"------------" # 40
)
# Victron operation modes
VICTRON_OP = { 0: { "code": 7, "lib": "Off" },
1: { "code": 12, "lib": "LOW-V" },
2: { "code": 3, "lib": "ERROR" },
3: { "code": 28, "lib": "Bulk" },
4: { "code": 29, "lib": "Absorption" },
5: { "code": 30, "lib": "Float" },
6: { "code": 31, "lib": "Storage" },
7: { "code": 1, "lib": "Equalize" },
9: { "code": 1, "lib": "Inverting" },
11: { "code": 1, "lib": "Supply" },
245: { "code": 6, "lib": "Init" },
246: { "code": 29, "lib": "Repeated absorption" },
247: { "code": 1, "lib": "Recondition" },
248: { "code": 1, "lib": "Bat safe" },
252: { "code": 32, "lib": "Remote" }
}
# Off reason for Victron DC-DC charger
VICTRON_DC_OFF = { 0x00000000: { "code": 40, "lib": "None" },
0x00000001: { "code": 4, "lib": "No-Input" },
0x00000002: { "code": 7, "lib": "Off" },
0x00000004: { "code": 7, "lib": "Off" },
0x00000008: { "code": 32, "lib": "Remote" },
0x00000010: { "code": 1, "lib": "Protection" },
0x00000020: { "code": 1, "lib": "Pay" },
0x00000040: { "code": 1, "lib": "BMS-CUT" },
0x00000080: { "code": 27, "lib": "Engine" },
0x00000081: { "code": 35, "lib": "R+Stop" },
0x00000100: { "code": 1, "lib": "Analyzing" }
}
# Initial values --------------------------------------------------
loop_time = 0
comp_time = 0
last_time = 0
# Global exception handler =======================================
def _handle_exception(loop, context):
print('Exception occurred !')
sys.print_exception(context["exception"])
sys.exit()
# Classes ========================================================
class BLEScanner:
def __init__(self, ble, target_mac_list):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._target_mac_list = target_mac_list
self._start_time = None
# Interrupt proceessing routine
def _irq(self, event, data):
global bt_queue
pin_led_bt.on()
if event == 5: # Event value for _IRQ_SCAN_RESULT
addr_type, addr, adv_type, rssi, adv_data = data
if rssi > _BT_MIN_RSSI:
# enable for debug only..
# if adv_type == 0: print("addr:",bytes(addr))
if addr in self._target_mac_list and adv_type == 0:
try:
# Queue received data for async coro to process
bt_queue.put_sync([bytes(addr), addr_type, adv_type, rssi, bytes(adv_data)])
except IndexError:
# Queue is full
pass
pin_led_bt.off()
# Start the scanner
def start_scan(self, duration_ms=_BT_SCAN_DURATION_MS, interval_us=_BT_SCAN_INTERVAL_US, window_us=_BT_SCAN_WINDOW_US):
self._ble.active(True)
self._start_time = time.ticks_ms()
self._ble.gap_scan(0, duration_ms, interval_us, window_us)
# Stop the scanner
def stop_scan(self):
self._ble.gap_scan(None)
self._ble.active(False)
# Functions ======================================================
def kelvin_to_celsius(kelvin):
return round(kelvin - 273.15, 2)
# Decode received and decrypted BT values
def bt_decode(dev,cleartext):
global victron
if _DEBUG: print(f"*** Found device : {dev}")
if _DEBUG >= 2:
print(" Raw Decrypted Data (Hex): ", ' '.join(['{:02X}'.format(b) for b in cleartext]))
if dev is "bmv712":
try:
if cleartext[2:4] != b'\xFF\x7F':
victron[dev]["volt"] = float(struct.unpack('h', cleartext[2:4])[0] / 100)
else:
victron[dev]["volt"] = _UNKN
except:
victron[dev]["volt"] = _UNKN
try:
if struct.unpack('B',cleartext[8:9])[0] & 0b11 == 0b10:
victron[dev]["temp"] = float(kelvin_to_celsius(struct.unpack('h', cleartext[6:8])[0] / 100))
else:
victron[dev]["temp"] = _UNKN
except:
victron[dev]["temp"] = _UNKN
if victron[dev]["volt"] != _UNKN and victron[dev]["temp"] != _UNKN:
victron[dev]["upd"] = time.ticks_ms()
if _DEBUG >= 2: print(f"Volt : {victron[dev]["volt"]} Temp: {victron[dev]["temp"]}")
if dev is "bmv712":
try:
victron[dev]["soc"] = float(((struct.unpack('h', cleartext[13:15])[0] & 0x3FFF) >> 4) / 10)
if victron[dev]["soc"] == 0x3FF:
victron[dev]["soc"] = _UNKN
except:
victron[dev]["soc"] = _UNKN
try:
amp = bytearray(cleartext[8:11])
if amp[2] & 0x80 == 0x80:
amp.extend(b'\xFF')
else:
amp.extend(b'\x00')
victron[dev]["amp"] = float(((struct.unpack('i', amp)[0]) >>2 ) / 1000)
except:
victron[dev]["amp"] = _UNKN
if _DEBUG >= 2: print(f"Soc : {victron[dev]["soc"]} Amp: {victron[dev]["amp"]}")
elif dev is "smartsolar":
try:
if cleartext[0:1] != b'\xFF':
victron[dev]["mode"] = struct.unpack('B', cleartext[0:1])[0]
else:
victron[dev]["mode"] = _UNKN
except:
victron[dev]["mode"] = _UNKN
try:
if cleartext[4:6] != b'\xFF\x7F':
victron[dev]["amp"] = float(struct.unpack('h', cleartext[4:6])[0] / 10)
else:
victron[dev]["amp"] = _UNKN
except:
victron[dev]["amp"] = _UNKN
try:
if cleartext[8:10] != b'\xFF\xFF':
victron[dev]["pwr"] = float(struct.unpack('H', cleartext[8:10])[0])
else:
victron[dev]["pwr"] = _UNKN
except:
victron[dev]["pwr"] = _UNKN
try:
victron[dev]["lib_mode"] = VICTRON_OP[victron[dev]["mode"]]["code"]
except:
victron[dev]["lib_mode"] = 1
if (victron[dev]["mode"] != _UNKN and victron[dev]["amp"] != _UNKN
and victron[dev]["pwr"] != _UNKN and victron[dev]["lib_mode"] != 1
):
victron[dev]["upd"] = time.ticks_ms()
if _DEBUG >= 2: print(f"Mode : {victron[dev]["mode"]} PWR : {victron[dev]["pwr"]} Lib : {victron[dev]["lib_mode"]}:{STATUS[victron[dev]["lib_mode"]]}")
elif dev is "orion":
try:
if cleartext[0:1] != b'\xFF':
victron[dev]["mode"] = struct.unpack('B', cleartext[0:1])[0]
else:
victron[dev]["mode"] = _UNKN
except:
victron[dev]["mode"] = _UNKN
try:
if cleartext[2:4] != b'\xFF\xFF':
victron[dev]["v_in"] = float(struct.unpack('H', cleartext[2:4])[0] / 100)
else:
victron[dev]["v_in"] = _UNKN
except:
victron[dev]["v_in"] = _UNKN
try:
if cleartext[4:6] != b'\xFF\x7F':
victron[dev]["v_out"] = float(struct.unpack('h', cleartext[4:6])[0] / 100)
else:
victron[dev]["v_out"] = _UNKN
except:
victron[dev]["v_out"] = _UNKN
try:
if cleartext[6:10] != b'\xFF\xFF\xFF\xFF':
victron[dev]["cause"] = int(struct.unpack('I', cleartext[6:10])[0])
else:
victron[dev]["cause"] = _UNKN
except:
victron[dev]["cause"] = _UNKN
try:
victron[dev]["lib_mode"] = VICTRON_OP[victron[dev]["mode"]]["code"]
except:
victron[dev]["lib_mode"] = 1
try:
victron[dev]["lib_cause"] = VICTRON_DC_OFF[victron[dev]["cause"]]["code"]
except:
victron[dev]["lib_cause"] = 1
if (victron[dev]["mode"] != _UNKN and victron[dev]["v_in"] != _UNKN
and (victron[dev]["mode"] == 0 or victron[dev]["v_out"] != _UNKN)
and victron[dev]["cause"] != _UNKN and victron[dev]["lib_mode"] != 1
and victron[dev]["lib_cause"] != 1
):
victron[dev]["upd"] = time.ticks_ms()
if _DEBUG >= 2: print(f"Mode : {victron[dev]["mode"]}:{victron[dev]["lib_mode"]}:{STATUS[victron[dev]["lib_mode"]]}, Cause: {victron[dev]["cause"]}:{victron[dev]["lib_cause"]}:{STATUS[victron[dev]["lib_cause"]]}")
if _DEBUG >= 2: print(f"V_in : {victron[dev]["v_in"]}, V_out: {victron[dev]["v_out"]}")
# Old BT values expiration ----------------------------------
async def bt_expire(coro_freq):
global comp_time, victron
while True:
coro_begin = time.ticks_ms()
if victron["bmv712"]["upd"] != _UNKN and time.ticks_diff(time.ticks_add(victron["bmv712"]["upd"],_BT_EXPIRE),coro_begin) <= 0 :
victron["bmv712"]["volt"] = _UNKN
victron["bmv712"]["amp"] = _UNKN
victron["bmv712"]["soc"] = _UNKN
victron["bmv712"]["temp"] = _UNKN
if _DEBUG >= 3: print(f"* CORO: bt_expire: Spent {time.ticks_diff(time.ticks_ms(),coro_begin)}.")
coro_throttle = time.ticks_diff(time.ticks_add(coro_begin,coro_freq),time.ticks_ms())
comp_time += time.ticks_diff(time.ticks_ms(), coro_begin)
if coro_throttle >= 0:
await asyncio.sleep_ms(coro_throttle)
else:
await asyncio.sleep(0)
# Decrypt and decode received BT values ------------------------------------------
async def bt_decrypt(bt_queue):
global comp_time, scanner, victron
async for bt_input in bt_queue:
coro_begin = time.ticks_ms()
pin_led_bt.on()
mac = bt_input[0] # MAC address, bytes
# mac_type = bt_input[1] # Address type, integer
adv_data = bt_input[4] # Advertisement data, bytes
kb0 = struct.unpack('B',adv_data[14:15])[0] # 1st encryption key byte
if _DEBUG:
timestamp = time.ticks_diff(time.ticks_ms(), scanner._start_time) / 1000
adv_type = bt_input[2] # Advertisement type, integer
rssi = bt_input[3] # RSSI, integer
print("\n{:.1f}s - Target Device Found - Address: {mac}, RSSI: {rssi}, Adv. Type: {adv_type}".format(
timestamp,
mac=':'.join(['{:02X}'.format(b) for b in mac]),
rssi=rssi,
adv_type=adv_type
))
if _DEBUG >= 2:
record_type = struct.unpack('B',adv_data[11:12])[0]
nonce = struct.unpack('H',adv_data[12:14])[0]
# Print the entire advertising data as hex
print(" Raw Advertising Data (Hex):", ' '.join(['{:02X}'.format(b) for b in adv_data]))
print(f" Record type: {record_type:#04X} Nonce: {nonce:#06X} Key byte 0: {kb0:#04X}")
for dev in victron:
if victron[dev]["mac"] == mac:
print("key from dev:",kb0)
if victron[dev]["key"][0:1] == kb0.to_bytes(1,0):
if _DEBUG >= 2: print(" Encryption key matches.")
# AES-CTR Decryption
# We should use AES-CTR but it is not implemented into mycropython's
# cryptolib, so we need to fake it using ECB.
# We have at most 16 bytes to decrypt, so we can do it in a single
# pass with the nonce + a zero CTR value.
ctr = bytearray(adv_data[12:14]) # Start with nonce
ctr.extend(bytes(14)) # Counter is zero
# if _DEBUG >= 2: print(" Ctr feed (Hex):", ' '.join(['{:02X}'.format(b) for b in ctr]))
ciphertext = bytearray(adv_data[15:]) # Our ciphertext
if len(adv_data[15:]) < 16 : # Extend it to 16 bytes
ciphertext.extend(bytes(16 - len(adv_data[15:]))) # if needed
cipher = cryptolib.aes(victron[dev]["key"],1) # Initialize AES ECB with key
cipher.encrypt(ctr,ctr) # Encrypt counter
# if _DEBUG >= 2: print(" Encrypted CTR (Hex):", ' '.join(['{:02X}'.format(b) for b in ctr]))
cleartext = bytes(a ^ b for a, b in zip(ciphertext, ctr)) # XOR results with ciphertext
bt_decode(dev,cleartext) # Now decode what we got
else:
if _DEBUG: print(f" Encryption key mismatch ! Device {dev}:mac Ours: {victron[dev]["key"][0:1]}, got: {kb0.to_bytes(1,0)}")
break
break
if _DEBUG >= 3: print(f"* CORO: bt_decrypt: Spent {time.ticks_diff(time.ticks_ms(),coro_begin)}.")
comp_time += time.ticks_diff(time.ticks_ms(), coro_begin)
pin_led_bt.off()
await asyncio.sleep(0)
# Main loop ==================================================
async def main():
global comp_time, last_time, loop_time
global scanner, bt_queue, bt_stat
loop = asyncio.get_event_loop()
loop.set_exception_handler(_handle_exception)
print("we are in the loop")
# Bluetooth thread safe queue
bt_queue = ThreadSafeQueue([[bytes(6), int(0), int(0), int(0), bytes(48)] for _ in range(20)])
# Create scheduled tasks
task_bt_expire = asyncio.create_task(bt_expire(5000)) # Expire old BT values
task_bt_decrypt = asyncio.create_task(bt_decrypt(bt_queue)) # Decrypt and decode BT values
await asyncio.sleep(0)
# Start Bluetooth BLE scanner
scanner = BLEScanner(ble, maclist)
scanner.start_scan(duration_ms=_BT_SCAN_DURATION_MS, interval_us=_BT_SCAN_INTERVAL_US, window_us=_BT_SCAN_WINDOW_US)
while True:
# Sets the main loop defined duration
loop_begin_tick = time.ticks_ms()
loop_end_target = time.ticks_add(loop_begin_tick,_TMR_MAIN_LOOP)
# Wait until desired loop duration
loop_time = time.ticks_diff(time.ticks_ms(),loop_begin_tick)
last_time = comp_time + loop_time
comp_time = 0
loop_throttle = time.ticks_diff(loop_end_target,time.ticks_ms()) - 1
if loop_throttle >= 0 :
await asyncio.sleep_ms(loop_throttle)
else:
loop_throttle = 0
# Let's do it !
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment