Skip to content

Instantly share code, notes, and snippets.

@lrodorigo
Last active September 11, 2021 17:18
Show Gist options
  • Save lrodorigo/875667fade5410c3869203896e8a7214 to your computer and use it in GitHub Desktop.
Save lrodorigo/875667fade5410c3869203896e8a7214 to your computer and use it in GitHub Desktop.
import pickle
from abc import abstractmethod
import numpy as np
from matplotlib import pyplot as mp
from scipy.linalg import block_diag
from scipy.optimize import linprog
PRICE_BUY = 0.20
PRICE_SELL = 0.07
PRICE_INCENTIVATION = 0.11
SEED = 213
GAMMA = PRICE_SELL / PRICE_INCENTIVATION
rng = np.random.default_rng(SEED)
def gaussian(x, mu, sig):
x = x % 24
# return np.array([1.0, 0.])
return np.exp(-np.power(x - mu, 2.) / (2 * np.power(sig, 2.)))
# return 1/3.0*np.concatenate([3*np.ones(int(N/2)), np.zeros(int(N/2))])
with open("./consumption_data.pkl", "rb") as f:
c_i = pickle.load(f) / 1000.0 # get_fitted_consumption_profile(i)
# c_i = np.array([0, 1])
N = len(c_i) # int(SIMULATED_HOURS * 60 / SAMPLING_MIN) + 1
N = 288 * 3
c_i = c_i[:N]
# c_i = np.concatenate([np.zeros(int(N/2)), 3*np.ones(int(N/2))])
SIMULATED_HOURS = N * 5 / 60
SAMPLING_MIN = 5.0
TO_KWH = SAMPLING_MIN / 60.0
class Storage:
def __init__(self):
self._state = 3.
self.CAPACITY = 10
@property
def state(self):
return self._state
def charge(self, value: float):
if value > 0:
if self._state >= self.CAPACITY:
return
self._state += value ## ERROREEEEEEEEEEE !!! Lo faccio due volte
else:
if self._state <= 0:
return
self._state += value
class AbstractStorageController:
def __init__(self, storage: Storage):
self.battery = storage
@abstractmethod
def control(self, index : int, p_i: float, c_i: float, **kwargs) -> float:
return 0
class LinProgStorageController(AbstractStorageController):
def build_constraints(self):
block = np.array([[1, 0]])
# Vincolo 1
self.Aub = block_diag(*([block] * N)) # matrice diagonale a blocchi
self.bub = np.array(self.c)
# Vincolo 2
block = np.array([[1, -1]])
self.Aub = np.vstack([self.Aub, block_diag(*([block] * N))])
self.bub = np.concatenate([self.bub, self.p])
# Vincolo 3
block = np.array([[0, -1]])
self.Aub = np.vstack([self.Aub, block_diag(*([block] * N))])
self.bub = np.concatenate([self.bub, self.p])
# Vincolo 4
temp_matrix = np.zeros([N, 2 * N])
for j in range(1, N + 1):
temp_matrix[j - 1, :] = np.concatenate([np.array([0, -1] * j), np.zeros(2 * N - 2 * j)])
self.bub = np.concatenate([self.bub, np.array([self.battery.CAPACITY - self.battery.state])])
self.Aub = np.vstack([self.Aub, temp_matrix])
# Vincolo 5
temp_matrix = np.zeros([N, 2 * N])
for k in range(1, N + 1):
temp_matrix[k - 1, :] = np.concatenate([np.array([0, 1] * k), np.zeros(2 * N - 2 * k)])
self.bub = np.concatenate([self.bub, np.array([self.battery.state])])
self.Aub = np.vstack([self.Aub, temp_matrix])
def __init__(self, storage: Storage, p, c):
super().__init__(storage)
self.p = p * TO_KWH
self.c = c * TO_KWH
self.costs = np.tile(np.array([-1, -GAMMA]), N) # vettore 2*N
self.Aub = None
self.bub = None
self.predicted_b = None
self.build_constraints()
self.run()
def run(self):
res = linprog(self.costs, self.Aub, self.bub, bounds=[(None, None)] * N * 2)
if res.success:
out = res.x[1::2]
self.predicted_b = out
mp.figure()
mp.plot(self.predicted_b)
mp.plot(self.battery.state - np.cumsum(out))
mp.grid()
mp.figure()
mp.title("Profilo di consumo e produzione")
mp.plot(i, c_i)
mp.plot(i, p_i)
mp.legend(["Consumo", "Produzione"])
mp.show()
else:
print(res.message)
exit(1)
def control(self, index: int, p_i: float, c_i: float, **kwargs) -> float:
return self.predicted_b[index]
class SimpleStorageController(AbstractStorageController):
def control(self, p, c):
if p > c:
available = p - c
if self.battery.state >= self.battery.CAPACITY:
return 0.0
charged = min(self.battery.CAPACITY - self.battery.state, available * TO_KWH)
self.battery.charge(charged)
return -charged / TO_KWH
else:
remaining = c - p
if self.battery.state < 0:
return 0.0
discharged = min(remaining * TO_KWH, self.battery.state)
self.battery.charge(-discharged)
return discharged / TO_KWH
def get_fitted_consumption_profile(i):
points = [
(0, 0.3),
(3, 0.18),
(6, 0.18),
(8, 0.4),
(9, 0.35),
(11, 0.30),
(14, 0.36),
(17, 0.33),
(18, 0.42),
(21, 0.6),
(23, 0.42),
(24, 0.3),
]
x = np.array([p[0] for p in points])
y = np.array([p[1] for p in points])
poly = np.poly1d(np.polyfit(x, y, len(points) - 4))
return poly(i)
battery = Storage()
i = np.linspace(0, SIMULATED_HOURS, N)
p_i = gaussian(i, 13, 2)
b_i = np.zeros(N)
B_i = np.zeros(N)
p_i_no_battery = np.copy(p_i)
c_i_no_battery = np.copy(c_i)
battery_controller = LinProgStorageController(battery, p_i, c_i)
for j in range(N):
control = battery_controller.control(j, p_i, c_i)
battery.charge(-control)
b_i[j] = control
B_i[j] = battery.state
p_i[j] += control
outcome_i = PRICE_BUY * c_i
income_i = p_i * PRICE_SELL + np.minimum(p_i, c_i * TO_KWH) * PRICE_INCENTIVATION
income_no_battery_i = p_i_no_battery * TO_KWH * PRICE_SELL + np.minimum(p_i_no_battery,
c_i_no_battery) * PRICE_INCENTIVATION * TO_KWH
net_income = income_i - outcome_i
mp.figure()
mp.title("Profilo di consumo")
mp.plot(i, c_i)
mp.figure()
mp.title("Profilo di Produzione")
mp.plot(i, p_i_no_battery)
mp.figure()
mp.title("Errore di autoconsumo")
mp.plot(i, abs(p_i - c_i))
mp.figure()
mp.title("Introiti €")
mp.plot(i, np.cumsum(income_i))
mp.plot(i, np.cumsum(income_no_battery_i))
mp.legend(["con batteria", "senza batteria"])
mp.grid()
mp.figure()
mp.title("Batteria")
mp.plot(i, b_i)
mp.plot(i, B_i)
# mp.plot(i, delta_i)
mp.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment