Skip to content

Instantly share code, notes, and snippets.

@izak
Created Sep 20, 2017
Embed
What would you like to do?
Script to run on the Victron Color Control so that the inverter power tracks available PV
#!/usr/bin/python -u
import sys, os
import logging
from math import pi
from functools import partial
from collections import Mapping
import dbus
from dbus.mainloop.glib import DBusGMainLoop
import gobject
MAXDISCHARGE = 1200
INCREMENT = 20
INTERVAL = 60000
OMEGA = (2 * pi)/100
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class smart_dict(dict):
""" Dictionary that can be accessed via attributes. """
def __getattr__(self, k):
try:
v = self[k]
if isinstance(v, Mapping):
return self.__class__(v)
return v
except KeyError:
raise AttributeError(k)
def __setattr__(self, k, v):
self[k] = v
state = smart_dict()
dbus_int_types = (dbus.Int32, dbus.UInt32, dbus.Byte, dbus.Int16, dbus.UInt16,
dbus.UInt32, dbus.Int64, dbus.UInt64)
def unwrap_dbus_value(val):
"""Converts D-Bus values back to the original type. For example if val is
of type DBus.Double, a float will be returned."""
if isinstance(val, dbus_int_types):
return int(val)
if isinstance(val, dbus.Double):
return float(val)
return val
def set_state(callback, key, v):
state[key] = value = unwrap_dbus_value(v["Value"])
if callback is not None:
callback(value)
def query(conn, service, path):
return conn.call_blocking(service, path, None, "GetValue", '', [])
def track(conn, service, path, target, callback=None):
# Initialise state
state[target] = value = unwrap_dbus_value(query(conn, service, path))
if callback is not None:
callback(value)
# And track it
conn.add_signal_receiver(partial(set_state, callback, target),
dbus_interface='com.victronenergy.BusItem',
signal_name='PropertiesChanged',
path=path,
bus_name=service)
def set_power(conn, v):
conn.get_object("com.victronenergy.settings",
"/Settings/CGwacs/MaxDischargePower").SetValue(v)
def _round(v):
return int(v/INCREMENT)*INCREMENT
def _track_filter(key, value):
p = state.get(key, 0)
state[key] = p + (value - p)*OMEGA if p > 0 else value
def main():
logging.basicConfig(level=logging.INFO)
DBusGMainLoop(set_as_default=True)
conn = dbus.SystemBus()
# Find vebus service
vebus = str(query(conn, "com.victronenergy.system", "/VebusService"))
logger.info("Found vebus at %s", vebus)
# Track some values
track(conn, vebus, "/Hub/ChargeVoltage", "target")
#track(conn, "com.victronenergy.settings",
# "/Settings/CGwacs/BatteryLife/MinimumSocLimit", "minsoc")
track(conn, "com.victronenergy.settings",
"/Settings/CGwacs/MaxDischargePower", "discharge")
track(conn, "com.victronenergy.system", "/Dc/Battery/Voltage", "voltage")
track(conn, "com.victronenergy.system",
"/Dc/Pv/Power", "power", partial(_track_filter, "smoothpower"))
track(conn, "com.victronenergy.system",
"/Ac/Consumption/L1/Power", "consumption")
# Periodic work
def _adjust():
logger.info("V: %.2f, SP: %.2f, P: %.2f, A: %.2f, C: %.2f",
state.voltage, state.smoothpower, state.power,
state.discharge, state.consumption)
if state.voltage + 1.4 < state.target:
# Voltage is low, then bring the power down to 85% of pv
level = _round(state.smoothpower * 0.85)
if level < 100: level = 0
if level < state.discharge:
logger.info("Updating power level to %d", level)
set_power(conn, level)
elif state.voltage + 0.3 > state.target:
# Voltage is high
# Evaluate whether we should increase inverter power
# Don't adjust downward, only adjust upward of there is room.
if state.discharge < MAXDISCHARGE and state.consumption > state.discharge:
level = max(state.smoothpower, state.discharge+INCREMENT)
level = min(_round(level), MAXDISCHARGE)
if level > state.discharge:
logger.info("Updating power level to %d", level)
set_power(conn, level)
return True
_adjust()
gobject.timeout_add(INTERVAL, _adjust)
gobject.MainLoop().run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment