Last active
September 11, 2021 17:18
-
-
Save lrodorigo/875667fade5410c3869203896e8a7214 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
from abc import abstractmethod | |
import numpy as np | |
from matplotlib import pyplot as mp | |
from scipy.linalg import block_diag | |
from scipy.optimize import linprog | |
PRICE_BUY = 0.20 | |
PRICE_SELL = 0.07 | |
PRICE_INCENTIVATION = 0.11 | |
SEED = 213 | |
GAMMA = PRICE_SELL / PRICE_INCENTIVATION | |
rng = np.random.default_rng(SEED) | |
def gaussian(x, mu, sig): | |
x = x % 24 | |
# return np.array([1.0, 0.]) | |
return np.exp(-np.power(x - mu, 2.) / (2 * np.power(sig, 2.))) | |
# return 1/3.0*np.concatenate([3*np.ones(int(N/2)), np.zeros(int(N/2))]) | |
with open("./consumption_data.pkl", "rb") as f: | |
c_i = pickle.load(f) / 1000.0 # get_fitted_consumption_profile(i) | |
# c_i = np.array([0, 1]) | |
N = len(c_i) # int(SIMULATED_HOURS * 60 / SAMPLING_MIN) + 1 | |
N = 288 * 3 | |
c_i = c_i[:N] | |
# c_i = np.concatenate([np.zeros(int(N/2)), 3*np.ones(int(N/2))]) | |
SIMULATED_HOURS = N * 5 / 60 | |
SAMPLING_MIN = 5.0 | |
TO_KWH = SAMPLING_MIN / 60.0 | |
class Storage: | |
def __init__(self): | |
self._state = 3. | |
self.CAPACITY = 10 | |
@property | |
def state(self): | |
return self._state | |
def charge(self, value: float): | |
if value > 0: | |
if self._state >= self.CAPACITY: | |
return | |
self._state += value ## ERROREEEEEEEEEEE !!! Lo faccio due volte | |
else: | |
if self._state <= 0: | |
return | |
self._state += value | |
class AbstractStorageController: | |
def __init__(self, storage: Storage): | |
self.battery = storage | |
@abstractmethod | |
def control(self, index : int, p_i: float, c_i: float, **kwargs) -> float: | |
return 0 | |
class LinProgStorageController(AbstractStorageController): | |
def build_constraints(self): | |
block = np.array([[1, 0]]) | |
# Vincolo 1 | |
self.Aub = block_diag(*([block] * N)) # matrice diagonale a blocchi | |
self.bub = np.array(self.c) | |
# Vincolo 2 | |
block = np.array([[1, -1]]) | |
self.Aub = np.vstack([self.Aub, block_diag(*([block] * N))]) | |
self.bub = np.concatenate([self.bub, self.p]) | |
# Vincolo 3 | |
block = np.array([[0, -1]]) | |
self.Aub = np.vstack([self.Aub, block_diag(*([block] * N))]) | |
self.bub = np.concatenate([self.bub, self.p]) | |
# Vincolo 4 | |
temp_matrix = np.zeros([N, 2 * N]) | |
for j in range(1, N + 1): | |
temp_matrix[j - 1, :] = np.concatenate([np.array([0, -1] * j), np.zeros(2 * N - 2 * j)]) | |
self.bub = np.concatenate([self.bub, np.array([self.battery.CAPACITY - self.battery.state])]) | |
self.Aub = np.vstack([self.Aub, temp_matrix]) | |
# Vincolo 5 | |
temp_matrix = np.zeros([N, 2 * N]) | |
for k in range(1, N + 1): | |
temp_matrix[k - 1, :] = np.concatenate([np.array([0, 1] * k), np.zeros(2 * N - 2 * k)]) | |
self.bub = np.concatenate([self.bub, np.array([self.battery.state])]) | |
self.Aub = np.vstack([self.Aub, temp_matrix]) | |
def __init__(self, storage: Storage, p, c): | |
super().__init__(storage) | |
self.p = p * TO_KWH | |
self.c = c * TO_KWH | |
self.costs = np.tile(np.array([-1, -GAMMA]), N) # vettore 2*N | |
self.Aub = None | |
self.bub = None | |
self.predicted_b = None | |
self.build_constraints() | |
self.run() | |
def run(self): | |
res = linprog(self.costs, self.Aub, self.bub, bounds=[(None, None)] * N * 2) | |
if res.success: | |
out = res.x[1::2] | |
self.predicted_b = out | |
mp.figure() | |
mp.plot(self.predicted_b) | |
mp.plot(self.battery.state - np.cumsum(out)) | |
mp.grid() | |
mp.figure() | |
mp.title("Profilo di consumo e produzione") | |
mp.plot(i, c_i) | |
mp.plot(i, p_i) | |
mp.legend(["Consumo", "Produzione"]) | |
mp.show() | |
else: | |
print(res.message) | |
exit(1) | |
def control(self, index: int, p_i: float, c_i: float, **kwargs) -> float: | |
return self.predicted_b[index] | |
class SimpleStorageController(AbstractStorageController): | |
def control(self, p, c): | |
if p > c: | |
available = p - c | |
if self.battery.state >= self.battery.CAPACITY: | |
return 0.0 | |
charged = min(self.battery.CAPACITY - self.battery.state, available * TO_KWH) | |
self.battery.charge(charged) | |
return -charged / TO_KWH | |
else: | |
remaining = c - p | |
if self.battery.state < 0: | |
return 0.0 | |
discharged = min(remaining * TO_KWH, self.battery.state) | |
self.battery.charge(-discharged) | |
return discharged / TO_KWH | |
def get_fitted_consumption_profile(i): | |
points = [ | |
(0, 0.3), | |
(3, 0.18), | |
(6, 0.18), | |
(8, 0.4), | |
(9, 0.35), | |
(11, 0.30), | |
(14, 0.36), | |
(17, 0.33), | |
(18, 0.42), | |
(21, 0.6), | |
(23, 0.42), | |
(24, 0.3), | |
] | |
x = np.array([p[0] for p in points]) | |
y = np.array([p[1] for p in points]) | |
poly = np.poly1d(np.polyfit(x, y, len(points) - 4)) | |
return poly(i) | |
battery = Storage() | |
i = np.linspace(0, SIMULATED_HOURS, N) | |
p_i = gaussian(i, 13, 2) | |
b_i = np.zeros(N) | |
B_i = np.zeros(N) | |
p_i_no_battery = np.copy(p_i) | |
c_i_no_battery = np.copy(c_i) | |
battery_controller = LinProgStorageController(battery, p_i, c_i) | |
for j in range(N): | |
control = battery_controller.control(j, p_i, c_i) | |
battery.charge(-control) | |
b_i[j] = control | |
B_i[j] = battery.state | |
p_i[j] += control | |
outcome_i = PRICE_BUY * c_i | |
income_i = p_i * PRICE_SELL + np.minimum(p_i, c_i * TO_KWH) * PRICE_INCENTIVATION | |
income_no_battery_i = p_i_no_battery * TO_KWH * PRICE_SELL + np.minimum(p_i_no_battery, | |
c_i_no_battery) * PRICE_INCENTIVATION * TO_KWH | |
net_income = income_i - outcome_i | |
mp.figure() | |
mp.title("Profilo di consumo") | |
mp.plot(i, c_i) | |
mp.figure() | |
mp.title("Profilo di Produzione") | |
mp.plot(i, p_i_no_battery) | |
mp.figure() | |
mp.title("Errore di autoconsumo") | |
mp.plot(i, abs(p_i - c_i)) | |
mp.figure() | |
mp.title("Introiti €") | |
mp.plot(i, np.cumsum(income_i)) | |
mp.plot(i, np.cumsum(income_no_battery_i)) | |
mp.legend(["con batteria", "senza batteria"]) | |
mp.grid() | |
mp.figure() | |
mp.title("Batteria") | |
mp.plot(i, b_i) | |
mp.plot(i, B_i) | |
# mp.plot(i, delta_i) | |
mp.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment