Skip to content

Instantly share code, notes, and snippets.

@jancervenka
Last active Nov 4, 2018
Embed
What would you like to do?
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
ALL_INDICES = [[ 1, 0],
[-1, 0],
[ 0, 1],
[ 0, -1],
[ 1, 1],
[ 1, -1],
[-1, 1],
[-1, -1]]
def get_neigbor_indices(i, j, world_shape):
# funkce která spočítá indexy všech sousedů daný buňky
# např. buňka 0, 0 nemá souseda i - 1, j - 1 protože jsou out of bounds
neighbor_indices = []
for idx in ALL_INDICES:
neighbor_i = i + idx[0]
neighbor_j = j + idx[1]
# sousedi nesměj ležet mimo hranice tenzoru
if 0 <= neighbor_i < world_shape[0] and 0 <= neighbor_j < world_shape[1]:
neighbor_indices.append([neighbor_i, neighbor_j])
return neighbor_indices
def get_features_target(data):
# opět předpokládáme, že data jsou 24 x M x N numpy.array
# do listu features budeme ukládát špočítaný ... features :)
features = []
# stějne tak target
target = []
# pro každou hodinu projdeme všechny buňky
for h in range(data.shape[0]):
for i in range(data.shape[1]):
for j in range(data.shape[2]):
# spočítáme si indexy a vezmeme data ze sousedních buňek
neighbor_indices = get_neigbor_indices(i, j, data.shape[1:])
neighbor_data = np.take(data[h, :, :], neighbor_indices)
features.append({'current_value': data[h, i, j], # současná hodnota buňky
'mean_diff': np.mean(data[h, i, j] - neighbor_data), # průměrnej rozdíl buňky a sousedů
'mean': np.mean(neighbor_data), # průměr sousedů
'std': np.std(neighbor_data), # std sousedů
'h': h})
target.append(data[(h + 1) % 24, i, j]]) # do targetu přidáme stav z příští hodiny
# features transforumejeme do pandas dataframu, target do numpy array
return pd.DataFrame(features), np.array(target)
# tady se vezmou features a target a natrénuje se model
# tohle jen nejzákladnější verze, měl by se tam ještě přidat scaling a PCA (principal component analysis)
# a možná řešit hodinu jako kategorickou proměnou
# před trénováním by se měla vyhodit začáteční hodina + validační set
# model by se měl validovat např. pomocí MSE (mean squared error)
class Model:
def __init__(self):
self._m = LinearRegression()
def fit(features, target):
self._m.fit(features, target):
def predict(features):
return self._m.predict(features)
# Před použitím třídy Simulation je potřeba vyvtořit instanci třídy Model a natrénovat ji
# na features a target z funkce get_features_target,
# natrénovanou instanci potom předat instanci týhle třídy přes argument model
class Simulation:
def __init__(self, initial_state, model, n_steps=23):
# data modelu by měly být uložený v 3D tenzoru (ideální je použít numpy.array)
# pokud je "svět" velkej např. 20x20 buněk a chceme modelovat 24 kroků
# tak tenzor bude mít tvar 24x20x20 - je to vlastně 24 matic naskládanejch za sebou
# hodnota jedný buňky je počet alertů (časem se to může nějak normalizovat)
self._data = np.empty(shape=(n_steps + 1,) + initial_state.shape)
# jako první matici se uloží výchozí stav, ze kterýho se modelují další kroky
# ostatní matice budou zatím prázdný
self._data[0, :, :] = initial_state
# tady se uloží natrénovanej model (předchozí třída model)
self._model = model
def _update_cell(self, step, i, j):
# tohle je funkce která bude obsahovat pravidla
# tady musí zopakovat postup z get_features_target
# pomocí funkce get_neigbor_indices a np.take se spočítaj hodnoty sousedů
# z nich se vytvoří našich 5 features a ze kterých model vypočítá hodnotu buňky v dalším kroku
# pozor - když budeme simulovat víc než 23 kroků je potřeba přes modulo přepočítat krok na hodinu v rozsahu 0-23
self._data[step + 1, i, j] = self.model.predict(features)
def _move_one_step(self):
# wrapper kterej spočítá novou hodnotu pro každou buňku i, j
for i in range(self._data[0].shape[0]):
for j in range(self._data[0].shape[1]):
self._update(self._current_step, i, j)
def run(self):
# hlavní metoda, která proběhne celou simulaci
for i in range(n_steps):
self._current_step = i
self._move_one_step()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment