This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
from sklearn.linear_model import LinearRegression | |
ALL_INDICES = [[ 1, 0], | |
[-1, 0], | |
[ 0, 1], | |
[ 0, -1], | |
[ 1, 1], | |
[ 1, -1], | |
[-1, 1], | |
[-1, -1]] | |
def get_neigbor_indices(i, j, world_shape): | |
# funkce která spočítá indexy všech sousedů daný buňky | |
# např. buňka 0, 0 nemá souseda i - 1, j - 1 protože jsou out of bounds | |
neighbor_indices = [] | |
for idx in ALL_INDICES: | |
neighbor_i = i + idx[0] | |
neighbor_j = j + idx[1] | |
# sousedi nesměj ležet mimo hranice tenzoru | |
if 0 <= neighbor_i < world_shape[0] and 0 <= neighbor_j < world_shape[1]: | |
neighbor_indices.append([neighbor_i, neighbor_j]) | |
return neighbor_indices | |
def get_features_target(data): | |
# opět předpokládáme, že data jsou 24 x M x N numpy.array | |
# do listu features budeme ukládát špočítaný ... features :) | |
features = [] | |
# stějne tak target | |
target = [] | |
# pro každou hodinu projdeme všechny buňky | |
for h in range(data.shape[0]): | |
for i in range(data.shape[1]): | |
for j in range(data.shape[2]): | |
# spočítáme si indexy a vezmeme data ze sousedních buňek | |
neighbor_indices = get_neigbor_indices(i, j, data.shape[1:]) | |
neighbor_data = np.take(data[h, :, :], neighbor_indices) | |
features.append({'current_value': data[h, i, j], # současná hodnota buňky | |
'mean_diff': np.mean(data[h, i, j] - neighbor_data), # průměrnej rozdíl buňky a sousedů | |
'mean': np.mean(neighbor_data), # průměr sousedů | |
'std': np.std(neighbor_data), # std sousedů | |
'h': h}) | |
target.append(data[(h + 1) % 24, i, j]]) # do targetu přidáme stav z příští hodiny | |
# features transforumejeme do pandas dataframu, target do numpy array | |
return pd.DataFrame(features), np.array(target) | |
# tady se vezmou features a target a natrénuje se model | |
# tohle jen nejzákladnější verze, měl by se tam ještě přidat scaling a PCA (principal component analysis) | |
# a možná řešit hodinu jako kategorickou proměnou | |
# před trénováním by se měla vyhodit začáteční hodina + validační set | |
# model by se měl validovat např. pomocí MSE (mean squared error) | |
class Model: | |
def __init__(self): | |
self._m = LinearRegression() | |
def fit(features, target): | |
self._m.fit(features, target): | |
def predict(features): | |
return self._m.predict(features) | |
# Před použitím třídy Simulation je potřeba vyvtořit instanci třídy Model a natrénovat ji | |
# na features a target z funkce get_features_target, | |
# natrénovanou instanci potom předat instanci týhle třídy přes argument model | |
class Simulation: | |
def __init__(self, initial_state, model, n_steps=23): | |
# data modelu by měly být uložený v 3D tenzoru (ideální je použít numpy.array) | |
# pokud je "svět" velkej např. 20x20 buněk a chceme modelovat 24 kroků | |
# tak tenzor bude mít tvar 24x20x20 - je to vlastně 24 matic naskládanejch za sebou | |
# hodnota jedný buňky je počet alertů (časem se to může nějak normalizovat) | |
self._data = np.empty(shape=(n_steps + 1,) + initial_state.shape) | |
# jako první matici se uloží výchozí stav, ze kterýho se modelují další kroky | |
# ostatní matice budou zatím prázdný | |
self._data[0, :, :] = initial_state | |
# tady se uloží natrénovanej model (předchozí třída model) | |
self._model = model | |
def _update_cell(self, step, i, j): | |
# tohle je funkce která bude obsahovat pravidla | |
# tady musí zopakovat postup z get_features_target | |
# pomocí funkce get_neigbor_indices a np.take se spočítaj hodnoty sousedů | |
# z nich se vytvoří našich 5 features a ze kterých model vypočítá hodnotu buňky v dalším kroku | |
# pozor - když budeme simulovat víc než 23 kroků je potřeba přes modulo přepočítat krok na hodinu v rozsahu 0-23 | |
self._data[step + 1, i, j] = self.model.predict(features) | |
def _move_one_step(self): | |
# wrapper kterej spočítá novou hodnotu pro každou buňku i, j | |
for i in range(self._data[0].shape[0]): | |
for j in range(self._data[0].shape[1]): | |
self._update(self._current_step, i, j) | |
def run(self): | |
# hlavní metoda, která proběhne celou simulaci | |
for i in range(n_steps): | |
self._current_step = i | |
self._move_one_step() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment