Skip to content

Instantly share code, notes, and snippets.

@mitchese
Last active November 17, 2023 21:00
Show Gist options
  • Save mitchese/afd823c3c5036c5b0e5394625f1a81e4 to your computer and use it in GitHub Desktop.
Save mitchese/afd823c3c5036c5b0e5394625f1a81e4 to your computer and use it in GitHub Desktop.
SMA Speedwire python interpreter
# SMA Speedwire interpreter
#
# This little script interprets SMA's speedwire as multicasted from the Sunny Home Manager 2
# and will print out the energy flow total, and for each phase
import socket
import struct
MULTICAST_IP = "239.12.255.254"
MULTICAST_PORT = 9522
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", MULTICAST_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MULTICAST_IP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
# def print_with_offsets(data):
# # For debugging: prints out the datastream in hex
# i = 0
# datasize = len(data)
# while i < datasize:
# print(str(i) + ": " + data[i:i+10].hex())
# i = i+10
def decode_phase_chunk(data, phase):
# This is given just a single chunk of phase data
total_buy = data[6:8]
print(phase + " buy: " + str(int.from_bytes(total_buy, byteorder="big") / 10))
total_sell = data[26:28]
print(phase + " sell: " + str(int.from_bytes(total_sell, byteorder="big") / 10))
def decode_speedwire(data):
# Taken from https://www.sma.de/fileadmin/content/global/Partner/Documents/SMA_Labs/EMETER-Protokoll-TI-en-10.pdf
# print_with_offsets(data)
SMA = data[
0:3
] # This is just the identifier of the packet, should start with "SMA\0"
sysUid = data[4:7]
serialNumber = data[20:24]
print(
"SMA: "
+ str(SMA)
+ " sysUid: "
+ str(sysUid)
+ " Serial: "
+ str(serialNumber.hex())
)
total_power_buy = data[34:36]
total_power_sell = data[54:56]
print("A: buy: " + str(int.from_bytes(total_power_buy, byteorder="big") / 10))
print("A: sell: " + str(int.from_bytes(total_power_sell, byteorder="big") / 10))
decode_phase_chunk(data[164:308], "L1")
decode_phase_chunk(data[308:452], "L2")
decode_phase_chunk(data[452:596], "L3")
while True:
decode_speedwire(sock.recv(10240))
@totocotonio
Copy link

Hi,
wie hast du denn in deinem Projekt "Gaszähler" den Wemos an den Impulszähler angeschlossen.

Gruuß Torsten

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment