Created
September 20, 2017 18:26
-
-
Save izak/e21c1523afc2ae2a8f44898a912827ab to your computer and use it in GitHub Desktop.
Script to run on the Victron Color Control so that the inverter power tracks available PV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python -u | |
import sys, os | |
import logging | |
from math import pi | |
from functools import partial | |
from collections import Mapping | |
import dbus | |
from dbus.mainloop.glib import DBusGMainLoop | |
import gobject | |
MAXDISCHARGE = 1200 | |
INCREMENT = 20 | |
INTERVAL = 60000 | |
OMEGA = (2 * pi)/100 | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
class smart_dict(dict): | |
""" Dictionary that can be accessed via attributes. """ | |
def __getattr__(self, k): | |
try: | |
v = self[k] | |
if isinstance(v, Mapping): | |
return self.__class__(v) | |
return v | |
except KeyError: | |
raise AttributeError(k) | |
def __setattr__(self, k, v): | |
self[k] = v | |
state = smart_dict() | |
dbus_int_types = (dbus.Int32, dbus.UInt32, dbus.Byte, dbus.Int16, dbus.UInt16, | |
dbus.UInt32, dbus.Int64, dbus.UInt64) | |
def unwrap_dbus_value(val): | |
"""Converts D-Bus values back to the original type. For example if val is | |
of type DBus.Double, a float will be returned.""" | |
if isinstance(val, dbus_int_types): | |
return int(val) | |
if isinstance(val, dbus.Double): | |
return float(val) | |
return val | |
def set_state(callback, key, v): | |
state[key] = value = unwrap_dbus_value(v["Value"]) | |
if callback is not None: | |
callback(value) | |
def query(conn, service, path): | |
return conn.call_blocking(service, path, None, "GetValue", '', []) | |
def track(conn, service, path, target, callback=None): | |
# Initialise state | |
state[target] = value = unwrap_dbus_value(query(conn, service, path)) | |
if callback is not None: | |
callback(value) | |
# And track it | |
conn.add_signal_receiver(partial(set_state, callback, target), | |
dbus_interface='com.victronenergy.BusItem', | |
signal_name='PropertiesChanged', | |
path=path, | |
bus_name=service) | |
def set_power(conn, v): | |
conn.get_object("com.victronenergy.settings", | |
"/Settings/CGwacs/MaxDischargePower").SetValue(v) | |
def _round(v): | |
return int(v/INCREMENT)*INCREMENT | |
def _track_filter(key, value): | |
p = state.get(key, 0) | |
state[key] = p + (value - p)*OMEGA if p > 0 else value | |
def main(): | |
logging.basicConfig(level=logging.INFO) | |
DBusGMainLoop(set_as_default=True) | |
conn = dbus.SystemBus() | |
# Find vebus service | |
vebus = str(query(conn, "com.victronenergy.system", "/VebusService")) | |
logger.info("Found vebus at %s", vebus) | |
# Track some values | |
track(conn, vebus, "/Hub/ChargeVoltage", "target") | |
#track(conn, "com.victronenergy.settings", | |
# "/Settings/CGwacs/BatteryLife/MinimumSocLimit", "minsoc") | |
track(conn, "com.victronenergy.settings", | |
"/Settings/CGwacs/MaxDischargePower", "discharge") | |
track(conn, "com.victronenergy.system", "/Dc/Battery/Voltage", "voltage") | |
track(conn, "com.victronenergy.system", | |
"/Dc/Pv/Power", "power", partial(_track_filter, "smoothpower")) | |
track(conn, "com.victronenergy.system", | |
"/Ac/Consumption/L1/Power", "consumption") | |
# Periodic work | |
def _adjust(): | |
logger.info("V: %.2f, SP: %.2f, P: %.2f, A: %.2f, C: %.2f", | |
state.voltage, state.smoothpower, state.power, | |
state.discharge, state.consumption) | |
if state.voltage + 1.4 < state.target: | |
# Voltage is low, then bring the power down to 85% of pv | |
level = _round(state.smoothpower * 0.85) | |
if level < 100: level = 0 | |
if level < state.discharge: | |
logger.info("Updating power level to %d", level) | |
set_power(conn, level) | |
elif state.voltage + 0.3 > state.target: | |
# Voltage is high | |
# Evaluate whether we should increase inverter power | |
# Don't adjust downward, only adjust upward of there is room. | |
if state.discharge < MAXDISCHARGE and state.consumption > state.discharge: | |
level = max(state.smoothpower, state.discharge+INCREMENT) | |
level = min(_round(level), MAXDISCHARGE) | |
if level > state.discharge: | |
logger.info("Updating power level to %d", level) | |
set_power(conn, level) | |
return True | |
_adjust() | |
gobject.timeout_add(INTERVAL, _adjust) | |
gobject.MainLoop().run() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment