Skip to content

Instantly share code, notes, and snippets.

@ArrEssJay
Last active April 12, 2022 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ArrEssJay/660238b0e8ec543f81d3b2c384f2d726 to your computer and use it in GitHub Desktop.
Save ArrEssJay/660238b0e8ec543f81d3b2c384f2d726 to your computer and use it in GitHub Desktop.
Flux (InfluxDB) Price Driven Heater Controller, using Amber (aus) price data and modbus power monitoring.
// Hybrid wholesale price (feed-forward) and proportional (feedback) storage water heater controller.
// Tries to keep the tank hot and the bill low
// This query runs every half hour (NEM metering period) and determines for the next 18 hours (available forecast period)
// how much time is needed to heat the water tank and which are the cheapest times to do that
// Basic theory
// Wholesale electricity prices are volatile. Lots of renewables means low prices. The storage heater allows us to ride out
// expensive periods by only heating water when it's cheap. This is fine until a week of rain means we run out of hot water.
// This controller consists of three main components:
// - Required Duty Cycle calcuation
// Statefully tracks (based on heater activity) the propoprtion of time within a windowing period (36 hours default)
// that the heater needs to be enabled to maintain a hot water supply. A proportional controller.
// - Forecast Cost minimisation
// Determines when to enable the heater based on forecast prices
// - Feed-forward error compensation
Price forecasts are both imperfect and dynamic. Nor does forecasting take into account manual control of the system.
- It is possible that the cheapest forecast times to run the heater are perpetually in the future and the tank goes cold.
- The heater might have been manually enabled to re-heat it from cold.
These under/overshoot errors are compensated for by subtracting previous heater activity within the historical time window
from the total estimated to be required.
// Switching in an ohmmeter across the heater circuit when the contactor is open (requires an NO+NC contactor) would be a
// straightforward way to obtain the heater thermocouple state at all times without resorting to modification of the
// tank or heater.
import "date"
import "experimental"
import "experimental/bitwise"
import "math"
option task = {name: "heaterScheduler", every: 30m}
start = -18h
stop = 18h
// in the future (ie. 18 *2)
totalPeriods = 36.0
// slightly above zero
targetIdleRatio = 0.05
timeIntervalLength = 30m
//NEM uses time at end of interval but we save interval start time when importing
currentTimeInterval = date.truncate(t: now(), unit: timeIntervalLength)
firstTimeInterval = experimental.addDuration(d: start, to: currentTimeInterval)
lastTimeInterval = experimental.addDuration(d: stop, to: currentTimeInterval)
nextimeInterval = experimental.addDuration(d: timeIntervalLength, to: currentTimeInterval)
forecastPrice =
from(bucket: "meterData")
|> range(start: currentTimeInterval, stop: lastTimeInterval)
|> filter(
fn: (r) =>
r["_measurement"] == "amber_pricing" and r["_field"] == "perKwh" and r["type"] == "ForecastInterval",
)
|> map(fn: (r) => ({r with _value: r._value / 100.0}))
|> drop(columns: ["channelType", "duration", "_renewables", "spotPerKwh"])
heaterState =
from(bucket: "meterData")
|> range(start: -1h, stop: now())
|> filter(fn: (r) => r["_measurement"] == "heaterState")
|> last()
idleRatio =
heaterState
|> tableFind(fn: (key) => key._field == "idleRatio")
|> getColumn(column: "_value")
// The historical enabled ratio gives us the starting target for number of periods to schedule
enabledRatio =
heaterState
|> tableFind(fn: (key) => key._field == "enabledRatio")
|> getColumn(column: "_value")
// Compare the windowed idle ratio to the target
idleDelta = targetIdleRatio - idleRatio[0]
// Apply a proportion of the idle ratio delta to adjust the number of periods to schedule
// i.e. a proportional controller
// When setting this up initially, some initial starting value will need to be injected
// P value may need adjusting
scheduleRatio = enabledRatio[0] + (idleDelta * 0.2)
periodsToSchedule = int(v: math.round(x: scheduleRatio * totalPeriods))
thresholdPrice =
forecastPrice
|> quantile(q: scheduleRatio, method: "exact_selector")
|> tableFind(fn: (key) => key._field == "perKwh")
|> getColumn(column: "_value")
activePeriods =
forecastPrice
|> map(
fn: (r) =>
({
_time: r._time,
_measurement: "activePeriods",
_field: "waterHeater",
_value: if r._value < thresholdPrice[0] then 1 else 0,
updated: string(v: uint(v: currentTimeInterval)),
}),
)
activePeriods
|> experimental.group(columns: ["_measurement", "updated"], mode: "extend")
|> to(bucket: "meterData")
// update load control for current period
// code below could probably be tidied up
activePeriods
|> range(start: currentTimeInterval, stop: nextimeInterval)
|> map(
fn: (r) =>
({r with
_measurement: "currentPeriodLoadStateMetadata",
_time: currentTimeInterval,
idleRatio: idleRatio[0],
enabledRatio: enabledRatio[0],
scheduleRatio: scheduleRatio,
totalPeriods: totalPeriods,
periodsToSchedule: periodsToSchedule,
}),
)
|> drop(columns: ["updated"])
//|> experimental.group(columns: ["_measurement", "previousActive", "scheduledActive"], mode: "extend")
// overwrite in case of multiple runs
|> to(
bucket: "meterData",
fieldFn: (r) =>
({
"idleRatio": r.idleRatio,
"enabledRatio": r.enabledRatio,
"scheduleRatio": r.scheduleRatio,
"totalPeriods": r.totalPeriods,
"periodsToSchedule": r.periodsToSchedule,
}),
)
|> map(fn: (r) => ({r with _measurement: "currentPeriodLoadState"}))
|> drop(
columns: [
"idleRatio",
"enabledRatio",
"scheduleRatio",
"totalPeriods",
"periodsToSchedule",
],
)
|> to(bucket: "meterData")
// This script runs every minute and tracks heater state, duty cycle etc. both for usage by
// the controller and for monitoring
import "experimental/bitwise"
import "experimental"
option task = {name: "heaterState", every: 1m}
// NEM metering period. Will be reduced to 5 minutes.
timeIntervalLength = 30m
// window size
start = -18h
stop = 18h
// Number of half hour periods that in the forecasting period.
// (i.e 'stop' * 2) as a float
totalPeriods = 36.0
// Instantaneous heater activity. Threshold the mean over the previous minute of data.
// Storage heater draws current on phase C.
// This can be replaced by any other source of data on whether the heater is heating or not.
heaterActive =
from(bucket: "meterData")
|> range(start: -1m, stop: now())
|> filter(
fn: (r) =>
r["_measurement"] == "modbus" and r["host"] == "nuc1" and r["name"] == "PM5350" and r["slave_id"] == "1"
and
r["type"] == "holding_register",
)
|> filter(fn: (r) => r["_field"] == "current_C")
|> mean()
|> map(fn: (r) => ({_time: now(), heaterActive: if r._value > 14.5 then true else false}))
// The heater is enabled/disable in this case by
// toggling a contactor via modbus. This can be controlled manually as well as by price signals.
// Encoded as a bitfield so care is needed in thresholding
heaterEnabled =
from(bucket: "meterData")
|> range(start: -1m, stop: now())
|> filter(fn: (r) => r["_field"] == "digital_outputs")
|> max()
|> drop(
columns: [
"_start",
"_stop",
"host",
"name",
"slave_id",
"type",
],
)
|> map(fn: (r) => ({_time: now(), _value: bitwise.uand(a: uint(v: r._value), b: uint(v: 1))}))
|> toBool()
|> map(fn: (r) => ({_time: now(), heaterEnabled: r._value}))
// Drop other fields
// |> map(fn: (r) => ({r with heaterIdle: if r.heaterIdle < 0 then 0 else r.heaterIdle}))
// Idle & Active Ratios
// Idle Ratio - The proportion of time that the heater is enabled but not heating. Given that the tank can store heat for a
// period of time, if over that time window the ratio is:
// - >0 : The tank has reached its setpoint during that time window.
// It is -likely- that the heater has been enabled for more time than was necessary.
// - 0 : The tank never reached its temperature setpoint.
// The idle ratio is averaged over the historical window period. This was determined through trial and error and would likely need
// to be adjusted for different sized installations or usage patterns.
// Active Ratio - Over the same period, the proportion of time for which the heater was active.
// Both of these are used by the controller to determine when to recommend enabling the heater.
// The mean is valid only if sampling is regular over the period. Influx does not interpolate/resample
// to remove bias due to irregular sampling intervals
windowMean =
from(bucket: "meterData")
|> range(start: start, stop: now())
|> filter(fn: (r) => r["_field"] == "heaterIdle" or r["_field"] == "heaterEnabled")
|> mean()
idleRatio =
windowMean
|> tableFind(fn: (key) => key._field == "heaterIdle")
|> getColumn(column: "_value")
enabledRatio =
windowMean
|> tableFind(fn: (key) => key._field == "heaterEnabled")
|> getColumn(column: "_value")
// calculate instantaneous idle state for future runs.
// cast bools to int for ease of plotting
heaterState =
join(tables: {heaterEnabled: heaterEnabled, heaterActive: heaterActive}, on: ["_time"])
|> map(
fn: (r) =>
({r with
heaterIdle: int(v: if r.heaterEnabled and not r.heaterActive then true else false),
_measurement: "heaterState",
idleRatio: idleRatio[0],
enabledRatio: enabledRatio[0],
heaterEnabled: int(v: r.heaterEnabled),
heaterActive: int(v: r.heaterActive),
}),
)
|> group(columns: ["_measurement"], mode: "by")
heaterState
|> experimental.to(bucket: "meterData")
# DB query and contactor control is handled in home assistant. For a local influx instance, MQTT push would be preferable to piolling
sensor:
- platform: influxdb
scan_interval: 10
api_version: 2
host: <host>
organization: <org>
token: <token>
bucket: meterData
queries_flux:
- name: scheduled_heating_state
bucket: meterData
range_start: "-1h"
query: >
filter(fn: (r) => r["_measurement"] == "currentPeriodLoadState" and r["_field"] == "waterHeater")
|> last()
switch:
- platform: template
switches:
storage_heater:
friendly_name: "Water Heater"
value_template: "{{ is_state('sensor.digital_out_1', 'on') }}"
turn_on:
service: modbus.write_register
data:
address: 5249
unit: 1
value:
- 6003
- 0
- 1
hub: modbus-proxy
turn_off:
service: modbus.write_register
data:
address: 5249
unit: 1
value:
- 6002
- 0
- 1
hub: modbus-proxy
## Telegraf conf used to pull price and power usage data
[[inputs.http]]
interval = "1m"
## One or more URLs from which to read formatted metrics
urls = ["https://api.amber.com.au/v1/sites/<your site ID>/prices/current?next=256&previous=1&resolution=30"]
## HTTP method
method = "GET"
#Amber SDK auth
headers = {"Authorization" = "Bearer psk_<your API key>"}
#Amber pricing data JSON
data_format = "json"
name_override = "amber_pricing"
tagexclude = ["url", "host"]
tag_keys = ["type", "channelType", "spikeStatus", "duration"]
json_time_key = "startTime"
json_string_fields = ["perKwh", "renewables", "spotPerKwh"]
json_time_format = "2006-01-02T15:04:05Z07:00"
[[inputs.modbus]]
name = "PM5350"
interval = "1s"
timeout = "1s"
slave_id = 1
controller = "tcp://modbus-proxy:502"
holding_registers = [
{ name = "digital_inputs", byte_order = "AB", data_type = "UINT16", scale=1.0, address = [8904]},
{ name = "digital_outputs", byte_order = "AB", data_type = "UINT16", scale=1.0, address = [9666]},
{ name = "alarms_1s_1", byte_order = "AB", data_type = "UINT16", scale=1.0, address = [11058]},
{ name = "alarms_1s_2", byte_order = "AB", data_type = "UINT16", scale=1.0, address = [11059]},
{ name = "alarms_1s_3", byte_order = "AB", data_type = "UINT16", scale=1.0, address = [11060]},
{ name = "alarms_unary", byte_order = "AB", data_type = "UINT16", scale=1.0, address = [11068]},
{ name = "alarms_digital", byte_order = "AB", data_type = "UINT16", scale=1.0, address = [11069]},
{ name = "current_A", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [2999,3000]},
{ name = "current_B", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3001,3002]},
{ name = "current_C", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3003,3004]},
{ name = "current_N", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3005,3006]},
{ name = "voltage_A_B", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3019,3020]},
{ name = "voltage_B_C", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3021,3022]},
{ name = "voltage_C_A", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3023,3024]},
{ name = "voltage_A_N", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3027,3028]},
{ name = "voltage_B_N", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3029,3030]},
{ name = "voltage_C_N", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3031,3032]},
{ name = "active_power_A", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3053,3054]},
{ name = "active_power_B", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3055,3056]},
{ name = "active_power_C", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3057,3058]},
{ name = "reactive_power_A", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3061,3062]},
{ name = "reactive_power_B", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3063,3064]},
{ name = "reactive_power_C", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3065,3066]},
{ name = "apparent_power_A", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3069,3070]},
{ name = "apparent_power_B", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3071,3072]},
{ name = "apparent_power_C", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3073,3074]},
{ name = "power_factor_A", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3077,3078]},
{ name = "power_factor_B", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3079,3080]},
{ name = "power_factor_C", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3081,3082]},
{ name = "displacement_power_factor_A", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3085,3086]},
{ name = "displacement_power_factor_B", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3087,3088]},
{ name = "displacement_power_factor_C", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3089,3090]},
{ name = "frequency", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [3109,3110]},
{ name = "phase_rotation", byte_order = "AB", data_type = "INT16", scale=1.0, address = [3133]},
{ name = "active_energy_delivered", byte_order = "ABCDEFGH", data_type = "INT64", scale=1.0, address = [3203,3204,3205,3206]},
{ name = "active_energy_received", byte_order = "ABCDEFGH", data_type = "INT64", scale=1.0, address = [3207,3208,3209,3210]},
{ name = "THD_voltage_A_N", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [21329,21330]},
{ name = "THD_voltage_B_N", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [21331,21332]},
{ name = "THD_voltage_C_N", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [21333,21334]},
{ name = "THD_voltage_A_B", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [21321,21322]},
{ name = "THD_voltage_B_C", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [21323,21324]},
{ name = "THD_voltage_C_A", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [21325,21326]},
{ name = "THD_current_A", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [21299,21300]},
{ name = "THD_current_B", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [21301,21302]},
{ name = "THD_current_C", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [21303,21304]},
{ name = "THD_current_N", byte_order = "ABCD", data_type = "FLOAT32-IEEE", scale=1.0, address = [21305,21306]},
]
[[outputs.influxdb_v2]]
urls = ["https://<your influx host>:8086"]
token = "<your token>"
organization = "<your org>"
bucket = "meterData"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment