Skip to content

Instantly share code, notes, and snippets.

@AbhishekAshokDubey
Created December 12, 2018 11:15
Show Gist options
  • Save AbhishekAshokDubey/67cb1cece14889295ca7b25cf89f5e7d to your computer and use it in GitHub Desktop.
Save AbhishekAshokDubey/67cb1cece14889295ca7b25cf89f5e7d to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
Created on Fri Oct 02 14:20:28 2018
@author: ADubey4
=========================================================================
Code to automate the data pre-processing and ML pipeline for the project
=========================================================================
Features provided:
1. Mutiple data sacling options: MinMax, Standard Normalization and Custom bounday scalar
:ref:`Scaler` & :ref:`CustomBoundariesScalar`
2. Converting Time series data to iid, be used by linear models. For now,
data shifting and rolling window features are provided for the same.
:ref:`MakeTimeFeatures`
3. 7 different linear regression models for provided to choose from, for the best model
:ref:`MLAlgoSet`
4. We are fitting the ARMA model for the residues, :ref:`common_ml_fns.auto_arima`
"""
from __future__ import division
import os
import sys
sys.path.append(r"D:\oneSTIM\GITFLOW\OST-Market-Intel\ml")
import re
import numpy as np
import pandas as pd
import pyramid as pm
from xgboost import XGBRegressor
import sklearn.linear_model as lm
from matplotlib import pyplot as plt
from sklearn.metrics import r2_score
from sklearn.feature_selection import *
from sklearn.model_selection import KFold
from sklearn.kernel_ridge import KernelRidge
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestRegressor
from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
def print_results(*argv):
"""
Functiont to pretty print the output object of the main pipeline, returned from :ref:`fit_best`
Parameters
----------
argv : List of Dictionaries
List of results object to be printed
"""
for arg in argv:
print("--------------"+arg["reg_model_params"]["algo__model_name"]+"----------------")
print(arg["reg_model_params"])
for k,v in arg.items():
if isinstance(v,float) or isinstance(v,int):
print(k, v)
def plot_scaled(out_directory = "",title="", colors="rgb", *argv):
"""
Function to plot the scaled values of the provided DataFrames
Parameters
----------
colors : string/ list, optional
Colors for each of the data to be ploted
argv : List
List of DataFrames/ Series to be plotted
"""
scalar = MinMaxScaler()
plt.figure()
for i, arg in enumerate(argv):
if type(arg) != list:
arg = [arg]
color=colors[i%len(colors)]
for j, df in enumerate(arg):
df_color = [color[j] if type(color) == list else color]
if type(df) == pd.core.series.Series:
label_name = df.name
df = df.to_frame()
if j == 0:
scalar = scalar.fit(df)
if len(df)>0:
plt.plot(df.index, scalar.transform(df), color=df_color[0], label = label_name)
plt.legend(bbox_to_anchor=(1.1, 1.05)).draggable(True)
plt.title(title)
title = re.sub("[^a-zA-Z0-9$]+"," ",re.sub("/", "$", title))
directory = out_directory+ '/plots/'
if not os.path.exists(directory):
os.makedirs(directory)
plt.savefig(directory+title)
class FeatureSelector(BaseEstimator, TransformerMixin):
"""
Function to select production/productivity features from the dataframe
Parameters
----------
feature_list : list of production/productivity features to be included
input_col : default input feature
"""
def __init__(self, feature_list = [],input_col="WTI_x"):
self.feature_list = feature_list
self.input_col = input_col
def fit(self, X, Y=None):
return self
def transform(self, X):
df = X
df = df[[self.input_col] + self.feature_list]
return df
#https://github.com/scikit-learn/scikit-learn/issues/5522
class CustomBoundariesScalar(BaseEstimator, TransformerMixin):
"""
Function to scale values of the provided DataFrames with custom values for the boundaries
Parameters
----------
scaling_boundaries : Dictionary of column scales
Colors for each of the data to be ploted
argv : List
List of DataFrames/ Series to be plotted
"""
def __init__(self, scaling_boundaries):
self.scaling_boundaries = scaling_boundaries
def transform(self, df):
for i in df.columns:
x = [x for x in self.scaling_boundaries.keys() if x == i][0]
_min, _max = self.scaling_boundaries[x]
df[i] = (df[i] - _min)/ (_max - _min)
df = np.array(df, dtype="float")
return df
def fit(self, df, y=None):
return self
def inverse_transform(self, df):
for i in df.columns:
x = [x for x in self.scaling_boundaries.keys() if x == i][0]
_min, _max = self.scaling_boundaries[x]
df[i] = df[i] * (_max - _min) + _min
df = np.array(df, dtype="float")
return df
class Scaler(BaseEstimator, TransformerMixin):
"""
Main data Normalization class to provide one of the scalers, to be used in the ML pipeline
Parameters
----------
scale_type : string ['StandardScalar', "MinMax", "Custom"]
Type of scalar to use
scaling_boundaries : Dict
Scaling boundaries in case Custom scalar is used
"""
def __init__(self, scale_type='StandardScalar',
scaling_boundaries=[{'WTI_x' :[20,150],
'HH_Gas' :[0,20],
'Total Oil production' :[1800000,7200000],
'Total Gas production' :[27000000,69000000],
'percentage_gas_produced' :[0,1],
'percentage_oil_produced' :[0,1]} ]):
self.scale_type = scale_type
self.scaling_boundaries = scaling_boundaries
def fit(self, X, y=None, **fit_params):
if self.scale_type == "MinMax":
self.scalar = MinMaxScaler()
elif self.scale_type == "StandardScalar":
self.scalar = StandardScaler()
elif self.scale_type == "Custom":
self.scalar = CustomBoundariesScalar(self.scaling_boundaries)
self.scalar.fit(X)
return self
def transform(self,X, **transform_params):
df = pd.DataFrame(self.scalar.transform(X), columns=X.columns, index= X.index)
return df
def inverse_transform(self,X):
print("inverse_transform")
return self.scalar.inverse_transform(X)
class MakeTimeFeatures(BaseEstimator, TransformerMixin):
"""
Converting Time series data to iid, be used by linear models. For now, data 'shifting'
and 'rolling' window features are provided, to capture the temporal data dependancies
Parameters
----------
feature_type : string ["shift", "rolling"]
Type of feature generation method
col_shift : List
Shift/ rolling window values corresponding to each of the features in the data
"""
def __init__(self, feature_type = "shift", col_shift = []):
self.feature_type = feature_type
self.col_shift = col_shift
def fit(self, X, Y=None):
return self
def transform(self, X):
df = pd.DataFrame(X)
df.reset_index(drop=True,inplace=True)
if self.feature_type == "shift":
df_all = []
for col in df.columns:
df_all.append(df[col].shift(self.col_shift[col]))
else:
df_all = [df]
for i, col in enumerate(df.columns):
shift_val = np.abs(self.col_shift[col]) + 1
if self.col_shift[col] >= 0:
df_w = df[col].rolling(window=shift_val, min_periods=shift_val)
df_all.append(pd.concat([df_w.mean(), df_w.std(ddof=0), df_w.min(), df_w.max()], axis=1))
else:
df_w = df[col][::-1].rolling(window=shift_val, min_periods=shift_val)
df_all.append(pd.concat([df_w.mean()[::-1], df_w.std(ddof=0)[::-1], df_w.min()[::-1], df_w.max()[::-1]], axis=1))
return pd.concat(df_all, axis=1).as_matrix()
def get_default_grid(feature_list = [[]],input_col=['WTI_x'],scalars = ["MinMax", "StandardScalar", "Custom"],
custom_boundary =[{'WTI_x':[20,150],'HH_Gas':[0,20]}],
windows_techniques = ["rolling", "shift"],
col_shift_window_dict =[{'WTI_x':4,'HH_Gas':4}],
):
"""
Function to fetch the initial (default) grid of all possibilities, the model should try,
before picking the best model.
Parameters
----------
scalars : string ["shift", "rolling"]
Possible type of feature generation method to be tried
custom_boundary : List of Dict
Possible set of Scaling boundaries for each feature in case Custom scalar is used
windows_techniques : List of string
Possible Shift/ rolling window techniques to be tried
col_shift_window_list : List of List
Possible set of Shift/ rolling window values corresponding to each of the features in the data,
to be tried
"""
default_parameter_space = [
# {
# 'featureselector__feature_list' : feature_list,
# 'featureselector__input_col' : input_col,
# 'normalize__scale_type': scalars,
# 'normalize__scaling_boundaries': custom_boundary,
# 'timefeatures__feature_type': windows_techniques,
# 'timefeatures__col_shift': col_shift_window_dict,
# 'algo__model_name': ['xgb'],
# 'algo__min_child_weight':[1,2],
# 'algo__gamma':[0, 0.3, 0.5, 0.7],
# 'algo__subsample':[1],
# 'algo__colsample_bytree':[0.5, 1.0],
# 'algo__max_depth': [3, 6]
# },
# {
# 'featureselector__feature_list' : feature_list,
# 'featureselector__input_col' : input_col,
# 'normalize__scale_type': scalars,
# 'normalize__scaling_boundaries': custom_boundary,
# 'timefeatures__feature_type': windows_techniques,
# 'timefeatures__col_shift': col_shift_window_dict,
# 'algo__model_name': ['rf'],
# 'algo__max_depth': [2,5],
# },
{
'featureselector__feature_list' : feature_list,
'featureselector__input_col' : input_col,
'normalize__scale_type': scalars,
'normalize__scaling_boundaries': custom_boundary,
'timefeatures__col_shift': col_shift_window_dict,
'timefeatures__feature_type': windows_techniques,
'algo__model_name': ['lr', 'Lasso'],
'algo__alpha': [0, 0.01, 0.1, 0.5,1.0,1.5, 2.0, 2.5]
},
{
'featureselector__feature_list' : feature_list,
'featureselector__input_col' : input_col,
'normalize__scale_type': scalars,
'normalize__scaling_boundaries': custom_boundary,
'timefeatures__col_shift': col_shift_window_dict,
'timefeatures__feature_type': windows_techniques,
'algo__model_name': ["kr"],
'algo__alpha': [1.0,1.5,2.0]
},
{
'featureselector__feature_list' : feature_list,
'featureselector__input_col' : input_col,
'normalize__scale_type': scalars,
'normalize__scaling_boundaries': custom_boundary,
'timefeatures__feature_type': windows_techniques,
'timefeatures__col_shift': col_shift_window_dict,
'algo__model_name': ['ElasticNet'],
'algo__alpha': [0.1, 0.5, 1.0, 2.0],
'algo__elastic_ratio': np.arange(0.0, 1.0, 0.1),
},
{
'featureselector__feature_list' : feature_list,
'featureselector__input_col' : input_col,
'normalize__scale_type': scalars,
'normalize__scaling_boundaries': custom_boundary,
'timefeatures__feature_type': windows_techniques,
'timefeatures__col_shift': col_shift_window_dict,
'algo__model_name': ['omp'],
'algo__n_nonzero_coefs': [1],
},
# {
# 'featureselector__feature_list' : feature_list,
# 'featureselector__input_col' : input_col,
# 'normalize__scale_type': scalars,
# 'normalize__scaling_boundaries': custom_boundary,
# 'timefeatures__feature_type': windows_techniques,
# 'timefeatures__col_shift': col_shift_window_dict,
# 'algo__model_name': ['br'],
# #'algo__alpha_1': [2,5],
# },
]
return default_parameter_space
class MLAlgoSet(BaseEstimator, RegressorMixin):
"""
The class for to select an regression algorithm in the pipeline from
all the possible set of Regression algorithms supported/ selected.
Parameters
----------
model_name : string
model name short form
alpha : float
alpha parameter for LR, KR, Lasso & ElasticNet
kernel : string
Kernel for Kernel Ridge
max_depth : int
max_depth for RandomForest Regressor
elastic_ratio : float
l1_ratio for ElasticNet
alpha_1, alpha_2, lambda_1, lambda_2 : float
Respective values for BayesianRidge model
n_nonzero_coefs : int
n_nonzero_coefs for Orthogonal Matching Pursuit model
"""
def __init__(self, model_name ="lr", alpha=0,
kernel="linear", max_depth=2,
elastic_ratio=1.0, n_nonzero_coefs = 1,
alpha_1=1e-06, alpha_2=1e-06,
lambda_1=1e-06, lambda_2=1e-06,min_child_weight=4,
gamma = 0.3, subsample= 0.6,
colsample_bytree = 0.6):
"""
:param model_name: string - algorithm name
"""
self.model_name = model_name
self.alpha = alpha
self.kernel = kernel
self.max_depth = max_depth
self.elastic_ratio = elastic_ratio
self.n_nonzero_coefs = n_nonzero_coefs
self.alpha_1 = alpha_1
self.alpha_2 = alpha_2
self.lambda_1 = lambda_1
self.lambda_2 = lambda_2
# self.min_child_weight = min_child_weight
# self.gamma = gamma
# self.subsample = subsample
# self.colsample_bytree = colsample_bytree
def fit(self, X, Y=None):
indx = ~np.isnan(X).any(axis=1)
X = X[indx]
Y = Y[indx]
if self.model_name == 'kr':
self._classifier = KernelRidge(alpha=self.alpha)
elif self.model_name == 'xgb':
self._classifier= XGBRegressor(nthread=-1
,max_depth = self.max_depth,min_child_weight=self.min_child_weight,
gamma=self.gamma,subsample =self.subsample,
colsample_bytree = self.colsample_bytree
)
elif self.model_name == 'lr':
self._classifier= lm.Ridge(alpha=self.alpha)
elif self.model_name == 'Lasso':
self._classifier = lm.Lasso(alpha =self.alpha)
elif self.model_name == 'rf':
self._classifier = RandomForestRegressor(n_estimators=10,
max_depth=self.max_depth)
elif self.model_name == 'ElasticNet':
self._classifier = lm.ElasticNet(alpha =self.alpha, l1_ratio = self.elastic_ratio)
elif self.model_name == 'omp':
self._classifier = lm.OrthogonalMatchingPursuit(n_nonzero_coefs = self.n_nonzero_coefs)
elif self.model_name == 'br':
self._classifier = lm.BayesianRidge(n_iter=3000,
alpha_1 = self.alpha_1,
alpha_2 = self.alpha_2,
lambda_1 = self.lambda_1,
lambda_2 = self.lambda_2)
else:
print(self.model_name)
raise ValueError('Classifier Not Supported')
self._classifier.fit(X, Y)
return self
def predict(self, X, y=None):
nan_indexes = np.isnan(X).any(axis=1)
X[nan_indexes] = 0
Y = self._classifier.predict(X)
Y[nan_indexes] = 0
return Y
## Score is opposite of Error,
## https://stackoverflow.com/questions/32401493/how-to-create-customize-your-own-scorer-function-in-scikit-learn
def overall_score(actual,prediction):
score = r2_score(actual, prediction)
return score
def mean_absolute_percentage_error(y_true, y_pred):
y_true, y_pred = np.array(y_true), np.array(y_pred)
return np.mean(np.abs((y_true - y_pred) / y_true)) * 100
def get_arima_pq(res_df, max_rel_len = 18, min_rel_val = 0.2):
p = sum(np.argwhere(abs(pm.pacf(res_df)) > min_rel_val).flatten() <= max_rel_len)
q = sum(np.argwhere(abs(pm.acf(res_df)) > min_rel_val).flatten()<=max_rel_len)
return p,q
def auto_arima(df):
# Done, check: get max p & q from PACF & ACF values
# MA, q
# AR, p
#SARIMAX(p,d,q)x(P,D,Q,m)
# n_fits = 20
p,q = get_arima_pq(df)
stepwise_model = pm.auto_arima(df, start_p=0, start_q=0,
max_p=p, max_q=q, m=12,
start_P=0, start_Q=0, seasonal=True,
trace=True, d = 0, max_order= 30,
error_action='ignore', n_jobs =-1,
suppress_warnings=True, stepwise = True, maxiter=200)
return stepwise_model
def check_ts(res_df):
pm.plot_acf(res_df) # MA, q
pm.plot_pacf(res_df) # AR, p
result = seasonal_decompose(res_df, model='additive')
result.plot()
plt.title("additive")
plt.show()
try:
result = seasonal_decompose(res_df, model='multiplicative')
print("-ve or zero(s) encountered in the data")
except:
min_val = res_df.values.min()
res_modify_df = res_df - min_val + 1e-4
result = seasonal_decompose(res_modify_df, model='multiplicative')
result.plot()
plt.title("multiplicative")
plt.show()
def fit_best(df_X,df_Y,input_col, pipeline, algo_grid, split_ratio = 0.80, cv_fold = 3):
"""
The function to run the grid search over all the set of possible provided and return the results,
for the best fitted model and different pre-processing possibilities
Parameters
----------
df_X, df_Y : Pandas DataFrames
Input and output DataFrames
pipeline : sklearn pipeline
pipeline for the order of methods to be tried
algo_grid: List of Dict
Possible set of algorithms, pre-processing values to be tried :ref:'get_default_grid'
split_ratio: float
ratio of train/ test split
cv_fold: int
number of cross-validation set to be created
Returns
----------
Dict with below keys
R2_train, MAPE_train, R2_test, MAPE_test: int
Respective scores
ts_model: pyramid.auto_arima
Best Fitted ARMA model on residue
reg_model: sklearn.linear_model
Best Fitted regression model
reg_model_params: Dict
Dict of parameter values of best fitted models
train_pred
predicted values for the train data
test_pred
predicted values for the test data, without ARMA correction
test_pred_corrected
predicted values for the test data, with ARMA correction
"""
results = {}
split_index = int(len(df_X)*split_ratio)
x_train, y_train = df_X[:split_index], df_Y[:split_index]
x_test, y_test = df_X[split_index:], df_Y[split_index:]
cv_method = KFold(n_splits=cv_fold, shuffle=False)
search = GridSearchCV(pipeline , algo_grid, n_jobs=-1, cv=cv_method, scoring = "r2", iid=True,verbose=1)
search.fit(df_X[:split_index].copy(), df_Y[:split_index].copy())
print('Best model:\n', search.best_params_)
model = search.best_estimator_
selected_feature_list = search.best_params_["featureselector__feature_list"]
shift_roll_values_list = []
for col in selected_feature_list:
shift_roll_values_list.append(search.best_params_["timefeatures__col_shift"][col])
shift_roll_values_list.append(search.best_params_["timefeatures__col_shift"][input_col])
shift_roll_values_list.append(0)
min_roll_shift = np.min(shift_roll_values_list)
max_roll_shift = np.max(shift_roll_values_list)
#check if both min and max rolling windows are negative
if max_roll_shift <= 0: max_roll_shift = None
#check if both min and max rolling windows are positive
if min_roll_shift >= 0: min_roll_shift = None
predicted_train = model.predict(df_X[:split_index-int(min_roll_shift or 0)].copy())
#Check if shift/rolling window creates expected number of nan at the beginning and end of input dat
assert (not(min_roll_shift) or np.all(predicted_train[min_roll_shift:] == 0)), "error"
assert (not(max_roll_shift) or np.all(predicted_train[:max_roll_shift] == 0)), "error"
predicted_train = predicted_train[max_roll_shift:min_roll_shift]
if split_ratio==1:
predicted_test =[]
else:
predicted_test = model.predict(df_X[split_index-int(max_roll_shift or 0):].copy())
# assert (not(max_roll_shift) or np.all(predicted_test[:max_roll_shift] == 0)), "error"
predicted_test = predicted_test[max_roll_shift:min_roll_shift]
######################################################################
results["R2_train"] = r2_score(y_train[max_roll_shift:min_roll_shift],predicted_train)
results["MAPE_train"] = mean_absolute_percentage_error(y_train[max_roll_shift:min_roll_shift],predicted_train)
if len(predicted_test)>0:
results["R2_test"] = r2_score(y_test[:min_roll_shift],predicted_test)
results["MAPE_test"] = mean_absolute_percentage_error(y_test[:min_roll_shift],predicted_test)
else:
results["R2_test"] = 'No test data'
results["MAPE_test"] = 'No test data'
#######################################################################
date_range = pd.date_range(start=x_train.index[0], end=x_train.index[-1], freq='MS')
if len(date_range) == len(x_train.index):
res_train_df = pd.DataFrame((y_train[max_roll_shift:min_roll_shift] - predicted_train))
res_train_df.index = y_train.index[max_roll_shift:min_roll_shift]
res_train_df.columns = ["residue"]
ts_model = auto_arima(res_train_df.copy())
#ts_model.fit(res_train_df)
if len(predicted_test)==0:
predicted_test_corrected = predicted_test
else:
ts_corrections = ts_model.predict(len(predicted_test))
predicted_test_corrected = predicted_test+ts_corrections
results["ts_model"] = ts_model
else:
predicted_test_corrected = predicted_test
results["ts_model"] = np.nan
results["test_pred_corrected"] = pd.DataFrame(predicted_test_corrected, index=y_test.index[:min_roll_shift])
results["test_pred"] = pd.DataFrame(predicted_test, index=y_test.index[:min_roll_shift])
results["train_pred"] = pd.DataFrame(predicted_train, index=y_train.index[max_roll_shift:min_roll_shift])
results["reg_model_params"] = search.best_params_
results["reg_model"] = model
return results
#if __name__ == "__main__":
# df = read_data(r"../data",
# "dataset_monthly.xlsx", ["WTI_x", "HH_Gas"] , "RigCount", basin_name = "all")
#
# X_df = df[["WTI_x", "HH_Gas"]]
# Y_df = df["RigCount"]
#
# pipeline = Pipeline([
# ('normalize', Scaler()),
# ("timefeatures", MakeTimeFeatures("rolling", [3,3])),
# ('algo', MLAlgoSet())
# ])
# algo_grid = get_default_grid()
# results = fit_best(X_df,Y_df, pipeline, algo_grid)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment