Skip to content

Instantly share code, notes, and snippets.

@fwaggle
Created March 13, 2022 04:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fwaggle/3d18cb8b83d0f1a3a5c22d83408a2637 to your computer and use it in GitHub Desktop.
Save fwaggle/3d18cb8b83d0f1a3a5c22d83408a2637 to your computer and use it in GitHub Desktop.
Growatt Inverter Monitoring
#! /usr/bin/python3
# James Fraser <fwaggle@fwaggle.org>
# Released under the WTFPL.
# Repeatedly ask a Growatt inverter to spam us with updates.
# When it does so, collect them up, average them out, and
# every ten seconds send the average to a collectd server.
import collectd
import serial
import time
inverter = None
collectd.start_threads()
conn = collectd.Connection(hostname='s0lar.home.fwaggle.org', collectd_host='192.168.0.2')
def inverter_init():
global inverter
print("Initializing inverter")
if not inverter or not inverter.is_open:
inverter = serial.Serial('/dev/ttyu0', baudrate=9600, bytesize=8, parity='N', stopbits=1, timeout=1)
# This is *supposed* to ask the inverter to delay for 1000ms between
# updates. My inverter 100% does not, it sends updates as fast as it
# can, which amounts to about 5Hz.
inverter.write(b'\x3F\x23\x7E\x34\x41\x7E\x32\x59\x31\x30\x30\x30\x23\x3F')
inverter.write(b'\x3F\x23\x7E\x34\x42\x7E\x23\x3F')
def handle_packet(packet):
# Sanity checks...
if packet[30] != 0x57:
print("Did not find next data packet start, expected 57 got %s" % (hex(packet[30])))
return False
# struct.unpack is for nerds
PV1 = 0.1 * ((packet[0] << 8) + packet[1])
PV2 = 0.1 * ((packet[4] << 8) + packet[5])
grid_v = 0.1 * ((packet[6] << 8) + packet[7])
grid_hz = 0.01 * ((packet[8] << 8) + packet[9])
power = 0.1 * ((packet[10] << 8) + packet[11])
temp = 0.1 * ((packet[12] << 8) + packet[13])
status = (packet[14])
fault = (packet[15])
today = 0.1 * ((packet[20] << 8) + packet[21])
# More Sanity checks
if status not in [0, 1, 2]:
raise Exception("Invalid status found, expecting 0, 1, or 2, got %d" % status)
if status != 1 and power > 0:
raise Exception("Status is a failure and power is non-zero, probably garbage. Status: %d Power: %.1f" % (status, power))
#print("PV1: %.1f PV2: %.1f" % (PV1, PV2))
#print("Grid: %.1fV @ %.2fHz" % (grid_v, grid_hz))
#print("Power: %.1fW" % (power))
#print("Temperature: %.1fC" % (temp))
#print("Today: %.1fkWh" % (today))
return (power, grid_v, today)
def main():
running = False
hunting = True
data_packet = bytearray(31)
idle = 0
last_time = 0
power_readings = []
while True:
# If we're not running, send the init packet and then start hunting
if not running:
inverter_init()
running = True
hunting = True
idle = 0
# If we've had 5 timeouts, wait 30 seconds, then try re-init.
if idle > 5:
running = False
time.sleep(30)
continue
# hunt for 0x57, which is the "start of response packet" (but may not be)
while hunting:
res = inverter.read()
if res == b'':
time.sleep(1)
idle = idle + 1
break
if res == b'\x57':
hunting = False
break
# If we're hunting, don't try read 31 bytes.
if hunting:
continue
# If we get here, the inverter is spamming us with data, so let's record it.
bread = inverter.readinto(data_packet)
if bread != 31:
print("Got %d bytes, expected 31..." % bread)
hunting = True
else:
try:
(power, grid_v, today) = handle_packet(data_packet)
power_readings.append(power)
now = time.time()
if now - 10 > last_time:
power_current = sum(power_readings) / len(power_readings)
conn.growatt.set_exact(power = power_current)
print("Power: %.1f (%d readings) Today: %.1f" % (power_current, len(power_readings), today))
power_readings = list()
last_time = now
except Exception as e:
print(str(e))
hunting = True
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment