Skip to content

Instantly share code, notes, and snippets.

@dennisheitmann
Last active May 29, 2024 15:24
Show Gist options
  • Save dennisheitmann/4b5124928882fd1d44d5bdd8c1a6a5cc to your computer and use it in GitHub Desktop.
Save dennisheitmann/4b5124928882fd1d44d5bdd8c1a6a5cc to your computer and use it in GitHub Desktop.
SMA Sunny Tripower 6.0 Modbus TCP Information to InfluxDB
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @module: sma.py
# @description: Python Class to get data from a SMA unit via pymodbusTCP
# Clase Python para tomar datos de un equipo SMA utilizando pymodbusTCP
# @autor: Pac0 Arriaza (JanusHL) (except indicated...)
# @created: 20/05/2019
# @license: free use of this module class if no changes in the copyright from the autors are made.
#
#--------------------------------------------------------------------------
# Adapted for SMA Sunny Tripower 6.0 to InfluxDB by Dennis Heitmann 2024-05-27
from pyModbusTCP.client import ModbusClient
import ctypes
import time
import sys
import logging
import datetime
from influxdb import InfluxDBClient
# Set up a client for InfluxDB
try:
dbclient = InfluxDBClient('127.0.0.1', 8086, 'root', 'root', 'all-data')
databasePrefix = "HOME/POWER/SMA/"
except Exception as e:
raise SystemExit(e)
'''
These classes/structures/unions, allow easy conversion between
modbus 16bit registers and ctypes (a useful format)
Copyright (c) 2017 stoberblog
'''
# Single register (16 bit) based types
class convert1(ctypes.Union):
_fields_ = [("u16", ctypes.c_uint16),
("s16", ctypes.c_int16)]
# Two register (32 bit) based types
class x2u16Struct(ctypes.Structure):
_fields_ = [("h", ctypes.c_uint16),
("l", ctypes.c_uint16)]
class convert2(ctypes.Union):
_fields_ = [("float", ctypes.c_float),
("u16", x2u16Struct),
("sint32", ctypes.c_int32),
("uint32", ctypes.c_uint32)]
# Four register (64 bit) based types
class x4u16Struct(ctypes.Structure):
_fields_ = [("hh", ctypes.c_uint16),
("hl", ctypes.c_uint16),
("lh", ctypes.c_uint16),
("ll", ctypes.c_uint16)]
class convert4(ctypes.Union):
_fields_ = [("u16", x4u16Struct),
("sint64", ctypes.c_int64),
("uint64", ctypes.c_uint64)]
#--- end of copyrighted code -----------------------------
class mbusTCP:
def __init__(self, ID, TCPaddress, PORT): #, TCPaddress
# Modbus instance
#debug=True (quitar cuando no sea necesario el modo debug)
try:
self.mb_device = ModbusClient(host=TCPaddress, port=PORT, timeout=10, debug=False, unit_id=ID)
self.functionCode = 3 # Function Code
self.dict = {}
except:
print (self.mb_device)
#return mb_device.last_error()
raise
def read_data(self, reg_ini, num_regs):
try:
data = self.mb_device.read_holding_registers(int(reg_ini), num_regs)
return data
except:
print (self.mb_device.last_error())
#return mb_device.last_error()
raise
def openTCP(self):
try:
self.mb_device.open()
#time.sleep(1)
except:
print ("\nError abriendo conexión TCP...")
def closeTCP(self):
try:
self.mb_device.close()
time.sleep(1)
except:
print ("\nError cerrando conexión TCP...")
###-----the two functions below are only for documentation purposes in how to read U32/U64 registers
def device_read_U64(self, ini_reg):
regs = self.mb_device.read_holding_registers(ini_reg-1, 4)
Translate=convert4()
Translate.u16.hh = regs[3]
Translate.u16.hl = regs[2]
Translate.u16.lh = regs[1]
Translate.u16.ll = regs[0]
return Translate.uint64
def device_read_U32(self, ini_reg):
regs = self.mb_device.read_holding_registers(ini_reg-1, 2)
Translate=convert2()
Translate.u16.h = regs[1]
Translate.u16.l = regs[0]
return Translate.uint32
def saveToDatabase(parameters: dict):
receiveTime = datetime.datetime.utcnow()
for key in parameters:
json_body = [
{
"measurement": databasePrefix + key,
"time": str(receiveTime),
"fields": {
"value": int(parameters[key])
}
}
]
dbclient.write_points(json_body)
if __name__ == '__main__':
mbus = mbusTCP(3, '192.168.0.52', 502) #sys.argv[1]
mbus.openTCP()
data = {}
TotalYield_Wh = mbus.read_data(30513,4)
data['TotalYield'] = TotalYield_Wh[0] * 4**16 + TotalYield_Wh[1] * 3**16 + TotalYield_Wh[2] * 2**16 + TotalYield_Wh[3]
TotalYield_kWh = mbus.read_data(30531,2)
data['TotalYield_kWh'] = TotalYield_kWh[0] * 2**16 + TotalYield_kWh[1]
TotalYield_MWh = mbus.read_data(30533,2)
data['TotalYield_MWh'] = TotalYield_MWh[0] * 2**16 + TotalYield_MWh[1]
data['PowerPV'] = mbus.read_data(30775,2)[1]
data['Status'] = mbus.read_data(30203,2)[1]
Iso_Ohm = mbus.read_data(30225,2)
data['Iso_Ohm'] = Iso_Ohm[0] * 2**16 + Iso_Ohm[1]
data['PowerDC_A'] = mbus.read_data(30773,2)[1]
data['PowerDC_B'] = mbus.read_data(30961,2)[1]
data['PowerPV_L1'] = mbus.read_data(30777,2)[1]
data['PowerPV_L2'] = mbus.read_data(30779,2)[1]
data['PowerPV_L3'] = mbus.read_data(30781,2)[1]
data['Voltage_L1'] = mbus.read_data(30783,2)[1]/100.0
data['Voltage_L2'] = mbus.read_data(30785,2)[1]/100.0
data['Voltage_L3'] = mbus.read_data(30787,2)[1]/100.0
mbus.closeTCP()
# Save data in InfluxDB
#print(data)
saveToDatabase(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment