Created
November 27, 2016 23:18
-
-
Save NetBUG/ee34e4b27765fc509cd3605c2db3624e to your computer and use it in GitHub Desktop.
Minimaliztic SARIMAX usage demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from datetime import datetime | |
import statsmodels.api as sm | |
import statsmodels.tsa.stattools as st | |
import pandas as pd | |
import json | |
import pytz | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import math | |
import csv | |
from dateutil.parser import parse | |
from scipy.optimize import brute | |
from sklearn.metrics import r2_score | |
#import TsUtils | |
DEBUG = True | |
PLOT = True | |
LOG_DIR = "../logs" | |
KEY_SEPARATOR = "~" | |
def computeR2(src, dest): | |
return r2_score(src, dest) * -1 | |
class TsSarimaModel: | |
def __init__(self, name, seasonality, orderParams = None): | |
self.name = name | |
self.seasonality = seasonality | |
self.orderParams = orderParams | |
self.endoModel = None | |
self.endoGof = 0 | |
self.endoMaxAbsResidual = 0 | |
self.endoEstParams = None | |
self.endoEstState = None | |
self.endoStateCov = None | |
self.p_endo = self.d_endo = self.q_endo = self.s_endo = None | |
def _objfunc_SARIMA(self, seasonal_order, endog, dummy): | |
# TODO: fix seasonal_order issue | |
print(seasonal_order) | |
try: | |
fit = sm.tsa.SARIMAX(endog=endog, exog=None, order=seasonal_order[:-1], #seasonal_order=seasonal_order, | |
enforce_stationarity=False, enforce_invertibility=False).fit(disp=-1) | |
#-- retVal = fit.aic | |
retVal = computeR2(fit.fittedvalues,endog) | |
except: | |
print("Crashed on " + repr(seasonal_order) + "!") | |
return np.inf | |
return retVal | |
def _findBestFit_SARIMAX(self, df_ts, dummy): | |
retVal = None | |
if(self.orderParams is None): | |
# Order = (p,d,q,s) | |
grid = (slice(1, 3, 1), slice(1, 2, 1), slice(1, 2, 1), slice(self.seasonality, self.seasonality + 1, 1)) | |
result = brute(self._objfunc_SARIMA, grid, args=(df_ts,dummy), finish=None, disp=False) | |
retVal= (int(result[0]), int(result[1]), int(result[2]), int(result[3])) | |
else: | |
retVal= (1,1,1,self.seasonality) | |
return retVal | |
def predict(self, nSteps, refit=False, resmooth=False, ts_series = None , significanceLevel=0.05): | |
endo_predict, refit_endo_predict = self.predictWithForecastObjs( | |
ts_series=ts_series, nSteps=nSteps, refit=refit, resmooth=resmooth) | |
endo_forecast_means = None | |
endo_forecast_confIntLowBand = None | |
endo_forecast_confIntHighBand = None | |
if(endo_predict is not None): | |
endo_forecast_means = endo_predict.predicted_mean | |
endo_forecast_confIntLowBand = endo_predict.conf_int(alpha=significanceLevel).iloc[:,0] | |
endo_forecast_confIntHighBand = endo_predict.conf_int(alpha=significanceLevel).iloc[:,1] | |
return (endo_forecast_means, endo_forecast_confIntLowBand, endo_forecast_confIntHighBand) | |
def predictWithForecastObjs(self, nSteps, refit=False, resmooth=False, ts_series = None): | |
endoForecasts = None | |
endoRefitForecasts = None | |
# If previous models are present | |
if(self.endoModel is not None): | |
endoForecasts = self.endoModel.get_forecast(steps=nSteps, dynamic=True) | |
# If we have to smooth or refit | |
if(refit or resmooth): | |
self.fitEndoModel(ts_series, refit=refit, smooth=resmooth) | |
endoRefitForecasts = self.endoModel.get_forecast(steps=nSteps, dynamic=True) | |
return (endoForecasts, endoRefitForecasts) | |
def fitEndoModel(self, ts_endo, refit = False, smooth=False): | |
try: | |
if(refit or smooth): | |
#-- print("FitEndo: Refitting: endo: %s" % (self.name)) | |
## Refit or smooth the model | |
if(self.endoEstState is None or self.endoStateCov is None): | |
raise ValueError( | |
"ExogTsAnalyze: fitExoModel: Previous state should be preserved when calling fit method with refit=True or smooth=True") | |
# We have some state from prev model | |
rtEndoModel = sm.tsa.SARIMAX(ts_endo, order=(self.p_endo, self.d_endo, self.q_endo), exog=None, | |
seasonal_order=(self.p_endo, self.d_endo, self.q_endo, self.s_endo), | |
enforce_stationarity=False, enforce_invertibility=False) | |
rtEndoModel.initialize_known(self.endoEstState, self.endoStateCov) | |
if(refit): | |
self.endoModel = rtEndoModel.fit(start_params= self.endoEstParams, disp=0) | |
elif(smooth): | |
self.endoModel = rtEndoModel.smooth(self.endoEstParams) | |
#-- print("End: Refitting: endo: %s - %s" % (self.name, type(self.endoModel))) | |
else: | |
#-- print("FitEndo: Fitting: endo: %s" % (self.name)) | |
# Create a new model | |
self.endoModel = sm.tsa.SARIMAX(ts_endo, order=(self.p_endo, self.d_endo, self.q_endo), exog=None, | |
seasonal_order=(self.p_endo, self.d_endo, self.q_endo, self.s_endo), | |
enforce_stationarity=False, enforce_invertibility=False).fit(disp=False) | |
self.endoGof = computeR2(self.endoModel.fittedvalues, ts_endo) | |
#self.endoMaxAbsResidual = TsUtils.computeMaxAbsResidual(self.endoModel.fittedvalues, ts_endo) | |
# Save the parameters and the predicted state values | |
self.endoEstParams = self.endoModel.params | |
self.endoEstState = self.endoModel.predicted_state[:, -1] | |
self.endoStateCov = self.endoModel.predicted_state_cov[:, :, -1] | |
except np.linalg.linalg.LinAlgError as err: | |
print("ERROR: EndoModel: Probably model did not converge: %s" % str(err)) | |
self.endoModel = None | |
self.endoGof = 0 | |
self.endoMaxAbsResidual = 0 | |
self.endoEstParams = None | |
self.endoEstState = None | |
self.endoStateCov = None | |
def computeModel(self, ts_series, trainOnPart=True): | |
breakIt = int(len(ts_series) * 0.8) | |
if(trainOnPart): | |
df_ts_train = ts_series[0:breakIt] | |
else: | |
df_ts_train=ts_series | |
df_ts_test = ts_series[breakIt:] | |
print("Computing optimal parameters for SARIMA model...") | |
self.p_endo, self.d_endo, self.q_endo, self.s_endo = self._findBestFit_SARIMAX(df_ts_train, None) | |
print("Optimal parameters for SARIMA model: p=%d, d=%d, q=%d, s=%d" % ( | |
self.p_endo, self.d_endo, self.q_endo, self.s_endo)) | |
self.fitEndoModel(df_ts_train) | |
if (DEBUG): | |
## Debug | |
print("Endogenous Model") | |
print(self.endoModel.summary()) | |
print("Endo R^2 = %f" % (self.endoGof)) | |
if PLOT: | |
self.plot(df_ts_train, df_ts_test, forecast = False) | |
return self.endoGof | |
def plot(self, df_ts_train, df_ts_test, forecast = False): | |
if (PLOT): | |
pred_res_endo = self.endoModel.get_prediction(full_results=True) | |
pred_means_endo = pred_res_endo.predicted_mean | |
pred_cis_endo = pred_res_endo.conf_int(alpha=0.05) | |
#-- fig = plt.figure(figsize=(12, 8)) | |
#-- ax1 = fig.add_subplot(211) | |
#-- fig = sm.graphics.tsa.plot_acf(ts_series.squeeze(), lags=40, ax=ax1) | |
#-- ax2 = fig.add_subplot(212) | |
#-- fig = sm.graphics.tsa.plot_pacf(df_ts_train, lags=40, ax=ax2) | |
#-- fig.suptitle('ACFs', fontsize=20) | |
#-- plt.draw() | |
# Plot from few days back | |
toInd = len(pred_means_endo) | |
fromInd = toInd - (self.seasonality * 7) | |
if (fromInd < 0): | |
fromInd = 0 | |
fig = plt.figure(figsize=(12, 8)) | |
ax = fig.add_subplot(1, 1, 1) | |
ax.plot(df_ts_train[fromInd:toInd], ':', color="blue", lw=2, label='Data') | |
ax.plot(df_ts_train[fromInd:toInd], 'o', alpha=0.5) | |
ax.plot(pred_means_endo[fromInd:toInd], lw=1, color="yellow", alpha=0.75, | |
label='SARIMA Predict (endo)') | |
ax.plot(df_ts_test, ':', lw=3, color="black", label='Data (test)') | |
if(forecast): | |
## Forecast for test_data | |
nforecasts = len(df_ts_test) | |
forecast_res_endo_predict, forecast_refit_res_endo_predict = self.predictWithForecastObjs(nforecasts, refit=False, resmooth=False, ts_series=None) | |
forecast_means_endo_predict = forecast_res_endo_predict.predicted_mean | |
forecast_cis_endo_predict = forecast_res_endo_predict.conf_int(alpha=0.05) | |
# Now predict each season | |
startIndex = 0 | |
stages_forecast_means_endo_predict = None | |
stages_forecast_cis_endo_predict = None | |
while(startIndex < nforecasts): | |
endIndex = startIndex + self.seasonality | |
if(endIndex > len(df_ts_test)): | |
endIndex = len(df_ts_test) | |
ts = df_ts_test[startIndex:endIndex] | |
nSteps = endIndex - startIndex | |
per_forecast_res_endo_predict, per_forecast_refit_res_endo_predict = self.predictWithForecastObjs(nSteps, refit=True, resmooth=True,ts_series=ts, ) | |
per_forecast_means_endo_predict = per_forecast_res_endo_predict.predicted_mean | |
per_forecast_cis_endo_predict = per_forecast_res_endo_predict.conf_int(alpha=0.05) | |
if(startIndex==0): | |
stages_forecast_means_endo_predict = per_forecast_means_endo_predict | |
stages_forecast_cis_endo_predict = per_forecast_cis_endo_predict | |
else: | |
stages_forecast_means_endo_predict = stages_forecast_means_endo_predict.append(per_forecast_means_endo_predict) | |
stages_forecast_cis_endo_predict = stages_forecast_cis_endo_predict.append(per_forecast_cis_endo_predict) | |
startIndex = endIndex | |
# One shot predict | |
ax.plot(forecast_means_endo_predict, lw=3, ls=":", color="red", alpha=0.5, | |
label='SARIMA Forecast (endo one-shot)') | |
ax.fill_between(forecast_means_endo_predict.index, forecast_cis_endo_predict.iloc[:, 0], | |
forecast_cis_endo_predict.iloc[:, 1], alpha=0.1, color="none", hatch="\\\\", | |
edgecolor="red", lw=1, label="95% CI Forecast (endo one-shot)") | |
# Seasonal refit and forecast | |
ax.plot(stages_forecast_means_endo_predict, lw=1, color="red", alpha=0.5, | |
label='SARIMA Forecast (endo smooth)') | |
ax.fill_between(stages_forecast_means_endo_predict.index, stages_forecast_cis_endo_predict.iloc[:, 0], | |
stages_forecast_cis_endo_predict.iloc[:, 1], alpha=0.1, color="red", lw=1, | |
label="95% CI Forecast (endo smooth)") | |
# Shrink current axis by 20% | |
box = ax.get_position() | |
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height]) | |
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5), fontsize=8) | |
#-- ax.legend(loc='upper right', fontsize=8) | |
fig.suptitle('Fit: %s: Gof = %0.3f' % ( | |
self.name, self.endoGof), fontsize=12) | |
plt.draw() | |
plt.savefig("%s/%s/%s.png" % (LOG_DIR, "", self.name.replace(KEY_SEPARATOR, "-"))) | |
def getSummary(self): | |
return ("%s, %f, %d, %d, %d, %d" % ( | |
self.name, self.endoGof, self.p_endo, self.d_endo, self.q_endo, self.s_endo)) | |
csvpath = "../data/20160828_SBI_4_Mentor_Race_1_100HZ_01_RawData_31-10-2016_20-06.csv" | |
def parseEpochMs(theVar, timezoneStr=None, default=None): | |
try: | |
if(timezoneStr is None): | |
retVal = pd.datetime.fromtimestamp(theVar * 100) | |
else: | |
retVal = pd.datetime.fromtimestamp(theVar * 1000.0, tz=pytz.timezone(timezoneStr)) | |
except ValueError: | |
retVal = default | |
return retVal | |
def createTs(timeInMsFromEpochList, timezoneStr = None): | |
retVal = [parseEpochMs(timeInMsFromEpoch, timezoneStr=timezoneStr, default=None) for timeInMsFromEpoch in timeInMsFromEpochList] | |
return(retVal) | |
def load_csv(cn="power output [kW]"): | |
df = pd.read_csv(csvpath, skiprows=5, sep=',', na_values='N/a', error_bad_lines=True) | |
print("Loaded " + str(len(df)) + " lines!") | |
print("Loaded " + str(len(df.values[0])) + " columns!") | |
plt.rcParams["figure.figsize"] = [a * 1.8 for a in plt.rcParams["figure.figsize"]] | |
timeValues = createTs(df['time [s]']) | |
df.reset_index() | |
df.set_index(pd.DatetimeIndex(timeValues), inplace=True) | |
#for cn in ['power output [kW]']:#, 'Throttle position [-]', 'Water temp [C]']: #df.columns.values:# | |
ts = pd.Series(df[cn].astype(np.float64)[230000:270000]).asfreq('1s') | |
return ts | |
if __name__ == "__main__": | |
cn = "power output [kW]" | |
df = load_csv(cn) | |
ds = df.values.shape[0] | |
mdl = TsSarimaModel(cn, 12) | |
print(type(df)) | |
mdl.computeModel(df) | |
#mdl.fitEndoModel(df) | |
breakIt = int(len(df) * 0.8) | |
mdl.plot(df[0:breakIt], df[breakIt:], True) | |
plt.show() | |
#mdl.plot(df[:ds * 0.8], df[ds * 0.8:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment