Skip to content

Instantly share code, notes, and snippets.

@TheGroundZero
Created February 16, 2022 12:27
Show Gist options
  • Save TheGroundZero/4df0e31869bff6ddb850e17d5f594412 to your computer and use it in GitHub Desktop.
Save TheGroundZero/4df0e31869bff6ddb850e17d5f594412 to your computer and use it in GitHub Desktop.
Read modbus data from Ginlong Solis inverter and send over MQTT. For more info, see blog post.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Read modbus data from Ginlong Solis inverter
# and send over MQTT
#
# Based on https://github.com/rogersia/Solis-4G
# Includes fixes, small modifcations and refactoring. Migrated to Python3.
# See https://sequr.be/blog/2021/08/reading-ginlong-solis-inverter-over-serial-and-importing-in-home-assistant-over-mqtt/
#
import logging
import minimalmodbus
import paho.mqtt.client as mqtt
import serial
import socket
import sys
import time
broker = "192.168.xxx.xxx"
port = 1883
mqttuser = "xxxxxxxxxxxxxxxxxxxx"
mqttpass = "xxxxxxxxxxxxxxxxxxxx"
client_id = "solis_com"
logging.basicConfig(stream=sys.stderr, level=logging.WARNING)
def mqtt_connect():
# callback for mqtt
def on_connect(client, userdata, flags, rc):
logging.debug("MQTT connected with result code {}".format(rc))
def on_disconnect(client, userdata, rc):
logging.debug("MQTT disconnected with result code {}".format(rc))
client.loop_stop()
client = mqtt.Client(client_id)
client.username_pw_set(mqttuser, mqttpass)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect_async(broker, port, 60)
return client
def mqtt_subscribe(client):
def on_message(client, userdata, msg):
logging.debug("[ {} ({})] {}".format(msg.topic, msg.qos, msg.payload))
client.subscribe("meters/solis_com/command", 2)
client.on_message = on_message
def mqtt_publish(client, data):
def on_publish(client, userdata, mid):
logging.debug("[{}] published ({})".format(mid, userdata))
def send(client, topic, payload="", qos=2, retain=False):
res = client.publish(topic, payload, qos, retain)
res.wait_for_publish()
logging.debug("[{}] status: {} - {}".format(res.mid, res.rc, "Published" if res.is_published() else "Failed"))
time.sleep(0.5)
client.on_publish = on_publish
for k, v in data.items():
send(client, "meters/solis_com/{}".format(k), v)
def modbus_connect():
instrument = minimalmodbus.Instrument('/dev/serial0', 2) # Set to inverter's address
instrument.serial.baudrate = 9600
instrument.serial.bytesize = 8
instrument.serial.parity = serial.PARITY_NONE
instrument.serial.stopbits = 1
instrument.serial.timeout = 3
#instrument.debug = True
return instrument
def modbus_read(instrument):
timestamp = time.time()
# get data from solis
Realtime_ACW = instrument.read_long(3004, functioncode=4, signed=False) # Read AC Watts as Unsigned 32-Bit
logging.info("{:<23s}{:10.2f} W".format("AC Watts", Realtime_ACW))
Realtime_DCV = instrument.read_register(3021, number_of_decimals=2, functioncode=4, signed=False) # Read DC Volts as Unsigned 16-Bit
logging.info("{:<23s}{:10.2f} V".format("DC Volt", Realtime_DCV))
Realtime_DCI = instrument.read_register(3022, number_of_decimals=0, functioncode=4, signed=False) # Read DC Current as Unsigned 16-Bit
logging.info("{:<23s}{:10.2f} A".format("DC Current", Realtime_DCI))
Realtime_ACV = instrument.read_register(3035, number_of_decimals=1, functioncode=4, signed=False) # Read AC Volts as Unsigned 16-Bit
logging.info("{:<23s}{:10.2f} V".format("AC Volt", Realtime_ACV))
Realtime_ACI = instrument.read_register(3038, number_of_decimals=0, functioncode=4, signed=False) # Read AC Current as Unsigned 16-Bit
logging.info("{:<23s}{:10.2f} A".format("AC Current", Realtime_ACI))
Realtime_ACF = instrument.read_register(3042, number_of_decimals=2, functioncode=4, signed=False) # Read AC Frequency as Unsigned 16-Bit
logging.info("{:<23s}{:10.2f} Hz".format("AC Frequency", Realtime_ACF))
Inverter_C = instrument.read_register(3041, number_of_decimals=1, functioncode=4, signed=True) # Read Inverter Temperature as Signed 16-Bit
logging.info("{:<23s}{:10.2f} °C".format("Inverter Temperature", Inverter_C))
AlltimeEnergy_KW = instrument.read_long(3008, functioncode=4, signed=False) # Read All Time Energy (KWH Total) as Unsigned 32-Bit
logging.info("{:<23s}{:10.2f} kWh".format("Generated (All time)", AlltimeEnergy_KW))
Today_KW = instrument.read_register(3014, number_of_decimals=1, functioncode=4, signed=False) # Read Today Energy (KWH Total) as 16-Bit
logging.info("{:<23s}{:10.2f} kWh".format("Generated (Today)", Today_KW))
data = {
'online': timestamp,
'acw': Realtime_ACW,
'dcv': Realtime_DCV,
'dci': Realtime_DCI,
'acv': Realtime_ACV,
'aci': Realtime_ACI,
'acf': Realtime_ACF,
'inc': Inverter_C
}
# Fix for 0-values during inverter powerup
if AlltimeEnergy_KW > 0: data["gat"] = AlltimeEnergy_KW
if Today_KW > 0: data["gto"] = Today_KW
return data;
def main():
try:
mqttc = mqtt_connect()
mqtt_subscribe(mqttc)
mqttc.loop_start()
modc = modbus_connect()
data = modbus_read(modc)
time.sleep(2)
mqtt_publish(mqttc, data)
time.sleep(2)
mqttc.loop_stop()
mqttc.disconnect()
except TypeError as err:
logging.error("TypeError:\n{}".format(err))
except ValueError as err:
logging.error("ValueError:\n{}".format(err))
except minimalmodbus.NoResponseError as err:
logging.error("Modbus no response:\n{}".format(err))
except serial.SerialException as err:
logging.error("SerialException:\n{}".format(err))
except Exception as err:
logging.error("Exception:\n{}".format(err))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment