Skip to content

Instantly share code, notes, and snippets.

@NetBUG
Created November 27, 2016 23:18
Show Gist options
  • Save NetBUG/ee34e4b27765fc509cd3605c2db3624e to your computer and use it in GitHub Desktop.
Save NetBUG/ee34e4b27765fc509cd3605c2db3624e to your computer and use it in GitHub Desktop.
Minimaliztic SARIMAX usage demo
import sys
from datetime import datetime
import statsmodels.api as sm
import statsmodels.tsa.stattools as st
import pandas as pd
import json
import pytz
import matplotlib.pyplot as plt
import numpy as np
import math
import csv
from dateutil.parser import parse
from scipy.optimize import brute
from sklearn.metrics import r2_score
#import TsUtils
DEBUG = True
PLOT = True
LOG_DIR = "../logs"
KEY_SEPARATOR = "~"
def computeR2(src, dest):
return r2_score(src, dest) * -1
class TsSarimaModel:
def __init__(self, name, seasonality, orderParams = None):
self.name = name
self.seasonality = seasonality
self.orderParams = orderParams
self.endoModel = None
self.endoGof = 0
self.endoMaxAbsResidual = 0
self.endoEstParams = None
self.endoEstState = None
self.endoStateCov = None
self.p_endo = self.d_endo = self.q_endo = self.s_endo = None
def _objfunc_SARIMA(self, seasonal_order, endog, dummy):
# TODO: fix seasonal_order issue
print(seasonal_order)
try:
fit = sm.tsa.SARIMAX(endog=endog, exog=None, order=seasonal_order[:-1], #seasonal_order=seasonal_order,
enforce_stationarity=False, enforce_invertibility=False).fit(disp=-1)
#-- retVal = fit.aic
retVal = computeR2(fit.fittedvalues,endog)
except:
print("Crashed on " + repr(seasonal_order) + "!")
return np.inf
return retVal
def _findBestFit_SARIMAX(self, df_ts, dummy):
retVal = None
if(self.orderParams is None):
# Order = (p,d,q,s)
grid = (slice(1, 3, 1), slice(1, 2, 1), slice(1, 2, 1), slice(self.seasonality, self.seasonality + 1, 1))
result = brute(self._objfunc_SARIMA, grid, args=(df_ts,dummy), finish=None, disp=False)
retVal= (int(result[0]), int(result[1]), int(result[2]), int(result[3]))
else:
retVal= (1,1,1,self.seasonality)
return retVal
def predict(self, nSteps, refit=False, resmooth=False, ts_series = None , significanceLevel=0.05):
endo_predict, refit_endo_predict = self.predictWithForecastObjs(
ts_series=ts_series, nSteps=nSteps, refit=refit, resmooth=resmooth)
endo_forecast_means = None
endo_forecast_confIntLowBand = None
endo_forecast_confIntHighBand = None
if(endo_predict is not None):
endo_forecast_means = endo_predict.predicted_mean
endo_forecast_confIntLowBand = endo_predict.conf_int(alpha=significanceLevel).iloc[:,0]
endo_forecast_confIntHighBand = endo_predict.conf_int(alpha=significanceLevel).iloc[:,1]
return (endo_forecast_means, endo_forecast_confIntLowBand, endo_forecast_confIntHighBand)
def predictWithForecastObjs(self, nSteps, refit=False, resmooth=False, ts_series = None):
endoForecasts = None
endoRefitForecasts = None
# If previous models are present
if(self.endoModel is not None):
endoForecasts = self.endoModel.get_forecast(steps=nSteps, dynamic=True)
# If we have to smooth or refit
if(refit or resmooth):
self.fitEndoModel(ts_series, refit=refit, smooth=resmooth)
endoRefitForecasts = self.endoModel.get_forecast(steps=nSteps, dynamic=True)
return (endoForecasts, endoRefitForecasts)
def fitEndoModel(self, ts_endo, refit = False, smooth=False):
try:
if(refit or smooth):
#-- print("FitEndo: Refitting: endo: %s" % (self.name))
## Refit or smooth the model
if(self.endoEstState is None or self.endoStateCov is None):
raise ValueError(
"ExogTsAnalyze: fitExoModel: Previous state should be preserved when calling fit method with refit=True or smooth=True")
# We have some state from prev model
rtEndoModel = sm.tsa.SARIMAX(ts_endo, order=(self.p_endo, self.d_endo, self.q_endo), exog=None,
seasonal_order=(self.p_endo, self.d_endo, self.q_endo, self.s_endo),
enforce_stationarity=False, enforce_invertibility=False)
rtEndoModel.initialize_known(self.endoEstState, self.endoStateCov)
if(refit):
self.endoModel = rtEndoModel.fit(start_params= self.endoEstParams, disp=0)
elif(smooth):
self.endoModel = rtEndoModel.smooth(self.endoEstParams)
#-- print("End: Refitting: endo: %s - %s" % (self.name, type(self.endoModel)))
else:
#-- print("FitEndo: Fitting: endo: %s" % (self.name))
# Create a new model
self.endoModel = sm.tsa.SARIMAX(ts_endo, order=(self.p_endo, self.d_endo, self.q_endo), exog=None,
seasonal_order=(self.p_endo, self.d_endo, self.q_endo, self.s_endo),
enforce_stationarity=False, enforce_invertibility=False).fit(disp=False)
self.endoGof = computeR2(self.endoModel.fittedvalues, ts_endo)
#self.endoMaxAbsResidual = TsUtils.computeMaxAbsResidual(self.endoModel.fittedvalues, ts_endo)
# Save the parameters and the predicted state values
self.endoEstParams = self.endoModel.params
self.endoEstState = self.endoModel.predicted_state[:, -1]
self.endoStateCov = self.endoModel.predicted_state_cov[:, :, -1]
except np.linalg.linalg.LinAlgError as err:
print("ERROR: EndoModel: Probably model did not converge: %s" % str(err))
self.endoModel = None
self.endoGof = 0
self.endoMaxAbsResidual = 0
self.endoEstParams = None
self.endoEstState = None
self.endoStateCov = None
def computeModel(self, ts_series, trainOnPart=True):
breakIt = int(len(ts_series) * 0.8)
if(trainOnPart):
df_ts_train = ts_series[0:breakIt]
else:
df_ts_train=ts_series
df_ts_test = ts_series[breakIt:]
print("Computing optimal parameters for SARIMA model...")
self.p_endo, self.d_endo, self.q_endo, self.s_endo = self._findBestFit_SARIMAX(df_ts_train, None)
print("Optimal parameters for SARIMA model: p=%d, d=%d, q=%d, s=%d" % (
self.p_endo, self.d_endo, self.q_endo, self.s_endo))
self.fitEndoModel(df_ts_train)
if (DEBUG):
## Debug
print("Endogenous Model")
print(self.endoModel.summary())
print("Endo R^2 = %f" % (self.endoGof))
if PLOT:
self.plot(df_ts_train, df_ts_test, forecast = False)
return self.endoGof
def plot(self, df_ts_train, df_ts_test, forecast = False):
if (PLOT):
pred_res_endo = self.endoModel.get_prediction(full_results=True)
pred_means_endo = pred_res_endo.predicted_mean
pred_cis_endo = pred_res_endo.conf_int(alpha=0.05)
#-- fig = plt.figure(figsize=(12, 8))
#-- ax1 = fig.add_subplot(211)
#-- fig = sm.graphics.tsa.plot_acf(ts_series.squeeze(), lags=40, ax=ax1)
#-- ax2 = fig.add_subplot(212)
#-- fig = sm.graphics.tsa.plot_pacf(df_ts_train, lags=40, ax=ax2)
#-- fig.suptitle('ACFs', fontsize=20)
#-- plt.draw()
# Plot from few days back
toInd = len(pred_means_endo)
fromInd = toInd - (self.seasonality * 7)
if (fromInd < 0):
fromInd = 0
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(1, 1, 1)
ax.plot(df_ts_train[fromInd:toInd], ':', color="blue", lw=2, label='Data')
ax.plot(df_ts_train[fromInd:toInd], 'o', alpha=0.5)
ax.plot(pred_means_endo[fromInd:toInd], lw=1, color="yellow", alpha=0.75,
label='SARIMA Predict (endo)')
ax.plot(df_ts_test, ':', lw=3, color="black", label='Data (test)')
if(forecast):
## Forecast for test_data
nforecasts = len(df_ts_test)
forecast_res_endo_predict, forecast_refit_res_endo_predict = self.predictWithForecastObjs(nforecasts, refit=False, resmooth=False, ts_series=None)
forecast_means_endo_predict = forecast_res_endo_predict.predicted_mean
forecast_cis_endo_predict = forecast_res_endo_predict.conf_int(alpha=0.05)
# Now predict each season
startIndex = 0
stages_forecast_means_endo_predict = None
stages_forecast_cis_endo_predict = None
while(startIndex < nforecasts):
endIndex = startIndex + self.seasonality
if(endIndex > len(df_ts_test)):
endIndex = len(df_ts_test)
ts = df_ts_test[startIndex:endIndex]
nSteps = endIndex - startIndex
per_forecast_res_endo_predict, per_forecast_refit_res_endo_predict = self.predictWithForecastObjs(nSteps, refit=True, resmooth=True,ts_series=ts, )
per_forecast_means_endo_predict = per_forecast_res_endo_predict.predicted_mean
per_forecast_cis_endo_predict = per_forecast_res_endo_predict.conf_int(alpha=0.05)
if(startIndex==0):
stages_forecast_means_endo_predict = per_forecast_means_endo_predict
stages_forecast_cis_endo_predict = per_forecast_cis_endo_predict
else:
stages_forecast_means_endo_predict = stages_forecast_means_endo_predict.append(per_forecast_means_endo_predict)
stages_forecast_cis_endo_predict = stages_forecast_cis_endo_predict.append(per_forecast_cis_endo_predict)
startIndex = endIndex
# One shot predict
ax.plot(forecast_means_endo_predict, lw=3, ls=":", color="red", alpha=0.5,
label='SARIMA Forecast (endo one-shot)')
ax.fill_between(forecast_means_endo_predict.index, forecast_cis_endo_predict.iloc[:, 0],
forecast_cis_endo_predict.iloc[:, 1], alpha=0.1, color="none", hatch="\\\\",
edgecolor="red", lw=1, label="95% CI Forecast (endo one-shot)")
# Seasonal refit and forecast
ax.plot(stages_forecast_means_endo_predict, lw=1, color="red", alpha=0.5,
label='SARIMA Forecast (endo smooth)')
ax.fill_between(stages_forecast_means_endo_predict.index, stages_forecast_cis_endo_predict.iloc[:, 0],
stages_forecast_cis_endo_predict.iloc[:, 1], alpha=0.1, color="red", lw=1,
label="95% CI Forecast (endo smooth)")
# Shrink current axis by 20%
box = ax.get_position()
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height])
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5), fontsize=8)
#-- ax.legend(loc='upper right', fontsize=8)
fig.suptitle('Fit: %s: Gof = %0.3f' % (
self.name, self.endoGof), fontsize=12)
plt.draw()
plt.savefig("%s/%s/%s.png" % (LOG_DIR, "", self.name.replace(KEY_SEPARATOR, "-")))
def getSummary(self):
return ("%s, %f, %d, %d, %d, %d" % (
self.name, self.endoGof, self.p_endo, self.d_endo, self.q_endo, self.s_endo))
csvpath = "../data/20160828_SBI_4_Mentor_Race_1_100HZ_01_RawData_31-10-2016_20-06.csv"
def parseEpochMs(theVar, timezoneStr=None, default=None):
try:
if(timezoneStr is None):
retVal = pd.datetime.fromtimestamp(theVar * 100)
else:
retVal = pd.datetime.fromtimestamp(theVar * 1000.0, tz=pytz.timezone(timezoneStr))
except ValueError:
retVal = default
return retVal
def createTs(timeInMsFromEpochList, timezoneStr = None):
retVal = [parseEpochMs(timeInMsFromEpoch, timezoneStr=timezoneStr, default=None) for timeInMsFromEpoch in timeInMsFromEpochList]
return(retVal)
def load_csv(cn="power output [kW]"):
df = pd.read_csv(csvpath, skiprows=5, sep=',', na_values='N/a', error_bad_lines=True)
print("Loaded " + str(len(df)) + " lines!")
print("Loaded " + str(len(df.values[0])) + " columns!")
plt.rcParams["figure.figsize"] = [a * 1.8 for a in plt.rcParams["figure.figsize"]]
timeValues = createTs(df['time [s]'])
df.reset_index()
df.set_index(pd.DatetimeIndex(timeValues), inplace=True)
#for cn in ['power output [kW]']:#, 'Throttle position [-]', 'Water temp [C]']: #df.columns.values:#
ts = pd.Series(df[cn].astype(np.float64)[230000:270000]).asfreq('1s')
return ts
if __name__ == "__main__":
cn = "power output [kW]"
df = load_csv(cn)
ds = df.values.shape[0]
mdl = TsSarimaModel(cn, 12)
print(type(df))
mdl.computeModel(df)
#mdl.fitEndoModel(df)
breakIt = int(len(df) * 0.8)
mdl.plot(df[0:breakIt], df[breakIt:], True)
plt.show()
#mdl.plot(df[:ds * 0.8], df[ds * 0.8:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment