Skip to content

Instantly share code, notes, and snippets.

Created December 12, 2018 11:15
Show Gist options
  • Save AbhishekAshokDubey/67cb1cece14889295ca7b25cf89f5e7d to your computer and use it in GitHub Desktop.
Save AbhishekAshokDubey/67cb1cece14889295ca7b25cf89f5e7d to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
Created on Fri Oct 02 14:20:28 2018
@author: ADubey4
Code to automate the data pre-processing and ML pipeline for the project
Features provided:
1. Mutiple data sacling options: MinMax, Standard Normalization and Custom bounday scalar
:ref:`Scaler` & :ref:`CustomBoundariesScalar`
2. Converting Time series data to iid, be used by linear models. For now,
data shifting and rolling window features are provided for the same.
3. 7 different linear regression models for provided to choose from, for the best model
4. We are fitting the ARMA model for the residues, :ref:`common_ml_fns.auto_arima`
from __future__ import division
import os
import sys
import re
import numpy as np
import pandas as pd
import pyramid as pm
from xgboost import XGBRegressor
import sklearn.linear_model as lm
from matplotlib import pyplot as plt
from sklearn.metrics import r2_score
from sklearn.feature_selection import *
from sklearn.model_selection import KFold
from sklearn.kernel_ridge import KernelRidge
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestRegressor
from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin
def warn(*args, **kwargs):
import warnings
warnings.warn = warn
def print_results(*argv):
Functiont to pretty print the output object of the main pipeline, returned from :ref:`fit_best`
argv : List of Dictionaries
List of results object to be printed
for arg in argv:
for k,v in arg.items():
if isinstance(v,float) or isinstance(v,int):
print(k, v)
def plot_scaled(out_directory = "",title="", colors="rgb", *argv):
Function to plot the scaled values of the provided DataFrames
colors : string/ list, optional
Colors for each of the data to be ploted
argv : List
List of DataFrames/ Series to be plotted
scalar = MinMaxScaler()
for i, arg in enumerate(argv):
if type(arg) != list:
arg = [arg]
for j, df in enumerate(arg):
df_color = [color[j] if type(color) == list else color]
if type(df) == pd.core.series.Series:
label_name =
df = df.to_frame()
if j == 0:
scalar =
if len(df)>0:
plt.plot(df.index, scalar.transform(df), color=df_color[0], label = label_name)
plt.legend(bbox_to_anchor=(1.1, 1.05)).draggable(True)
title = re.sub("[^a-zA-Z0-9$]+"," ",re.sub("/", "$", title))
directory = out_directory+ '/plots/'
if not os.path.exists(directory):
class FeatureSelector(BaseEstimator, TransformerMixin):
Function to select production/productivity features from the dataframe
feature_list : list of production/productivity features to be included
input_col : default input feature
def __init__(self, feature_list = [],input_col="WTI_x"):
self.feature_list = feature_list
self.input_col = input_col
def fit(self, X, Y=None):
return self
def transform(self, X):
df = X
df = df[[self.input_col] + self.feature_list]
return df
class CustomBoundariesScalar(BaseEstimator, TransformerMixin):
Function to scale values of the provided DataFrames with custom values for the boundaries
scaling_boundaries : Dictionary of column scales
Colors for each of the data to be ploted
argv : List
List of DataFrames/ Series to be plotted
def __init__(self, scaling_boundaries):
self.scaling_boundaries = scaling_boundaries
def transform(self, df):
for i in df.columns:
x = [x for x in self.scaling_boundaries.keys() if x == i][0]
_min, _max = self.scaling_boundaries[x]
df[i] = (df[i] - _min)/ (_max - _min)
df = np.array(df, dtype="float")
return df
def fit(self, df, y=None):
return self
def inverse_transform(self, df):
for i in df.columns:
x = [x for x in self.scaling_boundaries.keys() if x == i][0]
_min, _max = self.scaling_boundaries[x]
df[i] = df[i] * (_max - _min) + _min
df = np.array(df, dtype="float")
return df
class Scaler(BaseEstimator, TransformerMixin):
Main data Normalization class to provide one of the scalers, to be used in the ML pipeline
scale_type : string ['StandardScalar', "MinMax", "Custom"]
Type of scalar to use
scaling_boundaries : Dict
Scaling boundaries in case Custom scalar is used
def __init__(self, scale_type='StandardScalar',
scaling_boundaries=[{'WTI_x' :[20,150],
'HH_Gas' :[0,20],
'Total Oil production' :[1800000,7200000],
'Total Gas production' :[27000000,69000000],
'percentage_gas_produced' :[0,1],
'percentage_oil_produced' :[0,1]} ]):
self.scale_type = scale_type
self.scaling_boundaries = scaling_boundaries
def fit(self, X, y=None, **fit_params):
if self.scale_type == "MinMax":
self.scalar = MinMaxScaler()
elif self.scale_type == "StandardScalar":
self.scalar = StandardScaler()
elif self.scale_type == "Custom":
self.scalar = CustomBoundariesScalar(self.scaling_boundaries)
return self
def transform(self,X, **transform_params):
df = pd.DataFrame(self.scalar.transform(X), columns=X.columns, index= X.index)
return df
def inverse_transform(self,X):
return self.scalar.inverse_transform(X)
class MakeTimeFeatures(BaseEstimator, TransformerMixin):
Converting Time series data to iid, be used by linear models. For now, data 'shifting'
and 'rolling' window features are provided, to capture the temporal data dependancies
feature_type : string ["shift", "rolling"]
Type of feature generation method
col_shift : List
Shift/ rolling window values corresponding to each of the features in the data
def __init__(self, feature_type = "shift", col_shift = []):
self.feature_type = feature_type
self.col_shift = col_shift
def fit(self, X, Y=None):
return self
def transform(self, X):
df = pd.DataFrame(X)
if self.feature_type == "shift":
df_all = []
for col in df.columns:
df_all = [df]
for i, col in enumerate(df.columns):
shift_val = np.abs(self.col_shift[col]) + 1
if self.col_shift[col] >= 0:
df_w = df[col].rolling(window=shift_val, min_periods=shift_val)
df_all.append(pd.concat([df_w.mean(), df_w.std(ddof=0), df_w.min(), df_w.max()], axis=1))
df_w = df[col][::-1].rolling(window=shift_val, min_periods=shift_val)
df_all.append(pd.concat([df_w.mean()[::-1], df_w.std(ddof=0)[::-1], df_w.min()[::-1], df_w.max()[::-1]], axis=1))
return pd.concat(df_all, axis=1).as_matrix()
def get_default_grid(feature_list = [[]],input_col=['WTI_x'],scalars = ["MinMax", "StandardScalar", "Custom"],
custom_boundary =[{'WTI_x':[20,150],'HH_Gas':[0,20]}],
windows_techniques = ["rolling", "shift"],
col_shift_window_dict =[{'WTI_x':4,'HH_Gas':4}],
Function to fetch the initial (default) grid of all possibilities, the model should try,
before picking the best model.
scalars : string ["shift", "rolling"]
Possible type of feature generation method to be tried
custom_boundary : List of Dict
Possible set of Scaling boundaries for each feature in case Custom scalar is used
windows_techniques : List of string
Possible Shift/ rolling window techniques to be tried
col_shift_window_list : List of List
Possible set of Shift/ rolling window values corresponding to each of the features in the data,
to be tried
default_parameter_space = [
# {
# 'featureselector__feature_list' : feature_list,
# 'featureselector__input_col' : input_col,
# 'normalize__scale_type': scalars,
# 'normalize__scaling_boundaries': custom_boundary,
# 'timefeatures__feature_type': windows_techniques,
# 'timefeatures__col_shift': col_shift_window_dict,
# 'algo__model_name': ['xgb'],
# 'algo__min_child_weight':[1,2],
# 'algo__gamma':[0, 0.3, 0.5, 0.7],
# 'algo__subsample':[1],
# 'algo__colsample_bytree':[0.5, 1.0],
# 'algo__max_depth': [3, 6]
# },
# {
# 'featureselector__feature_list' : feature_list,
# 'featureselector__input_col' : input_col,
# 'normalize__scale_type': scalars,
# 'normalize__scaling_boundaries': custom_boundary,
# 'timefeatures__feature_type': windows_techniques,
# 'timefeatures__col_shift': col_shift_window_dict,
# 'algo__model_name': ['rf'],
# 'algo__max_depth': [2,5],
# },
'featureselector__feature_list' : feature_list,
'featureselector__input_col' : input_col,
'normalize__scale_type': scalars,
'normalize__scaling_boundaries': custom_boundary,
'timefeatures__col_shift': col_shift_window_dict,
'timefeatures__feature_type': windows_techniques,
'algo__model_name': ['lr', 'Lasso'],
'algo__alpha': [0, 0.01, 0.1, 0.5,1.0,1.5, 2.0, 2.5]
'featureselector__feature_list' : feature_list,
'featureselector__input_col' : input_col,
'normalize__scale_type': scalars,
'normalize__scaling_boundaries': custom_boundary,
'timefeatures__col_shift': col_shift_window_dict,
'timefeatures__feature_type': windows_techniques,
'algo__model_name': ["kr"],
'algo__alpha': [1.0,1.5,2.0]
'featureselector__feature_list' : feature_list,
'featureselector__input_col' : input_col,
'normalize__scale_type': scalars,
'normalize__scaling_boundaries': custom_boundary,
'timefeatures__feature_type': windows_techniques,
'timefeatures__col_shift': col_shift_window_dict,
'algo__model_name': ['ElasticNet'],
'algo__alpha': [0.1, 0.5, 1.0, 2.0],
'algo__elastic_ratio': np.arange(0.0, 1.0, 0.1),
'featureselector__feature_list' : feature_list,
'featureselector__input_col' : input_col,
'normalize__scale_type': scalars,
'normalize__scaling_boundaries': custom_boundary,
'timefeatures__feature_type': windows_techniques,
'timefeatures__col_shift': col_shift_window_dict,
'algo__model_name': ['omp'],
'algo__n_nonzero_coefs': [1],
# {
# 'featureselector__feature_list' : feature_list,
# 'featureselector__input_col' : input_col,
# 'normalize__scale_type': scalars,
# 'normalize__scaling_boundaries': custom_boundary,
# 'timefeatures__feature_type': windows_techniques,
# 'timefeatures__col_shift': col_shift_window_dict,
# 'algo__model_name': ['br'],
# #'algo__alpha_1': [2,5],
# },
return default_parameter_space
class MLAlgoSet(BaseEstimator, RegressorMixin):
The class for to select an regression algorithm in the pipeline from
all the possible set of Regression algorithms supported/ selected.
model_name : string
model name short form
alpha : float
alpha parameter for LR, KR, Lasso & ElasticNet
kernel : string
Kernel for Kernel Ridge
max_depth : int
max_depth for RandomForest Regressor
elastic_ratio : float
l1_ratio for ElasticNet
alpha_1, alpha_2, lambda_1, lambda_2 : float
Respective values for BayesianRidge model
n_nonzero_coefs : int
n_nonzero_coefs for Orthogonal Matching Pursuit model
def __init__(self, model_name ="lr", alpha=0,
kernel="linear", max_depth=2,
elastic_ratio=1.0, n_nonzero_coefs = 1,
alpha_1=1e-06, alpha_2=1e-06,
lambda_1=1e-06, lambda_2=1e-06,min_child_weight=4,
gamma = 0.3, subsample= 0.6,
colsample_bytree = 0.6):
:param model_name: string - algorithm name
self.model_name = model_name
self.alpha = alpha
self.kernel = kernel
self.max_depth = max_depth
self.elastic_ratio = elastic_ratio
self.n_nonzero_coefs = n_nonzero_coefs
self.alpha_1 = alpha_1
self.alpha_2 = alpha_2
self.lambda_1 = lambda_1
self.lambda_2 = lambda_2
# self.min_child_weight = min_child_weight
# self.gamma = gamma
# self.subsample = subsample
# self.colsample_bytree = colsample_bytree
def fit(self, X, Y=None):
indx = ~np.isnan(X).any(axis=1)
X = X[indx]
Y = Y[indx]
if self.model_name == 'kr':
self._classifier = KernelRidge(alpha=self.alpha)
elif self.model_name == 'xgb':
self._classifier= XGBRegressor(nthread=-1
,max_depth = self.max_depth,min_child_weight=self.min_child_weight,
gamma=self.gamma,subsample =self.subsample,
colsample_bytree = self.colsample_bytree
elif self.model_name == 'lr':
self._classifier= lm.Ridge(alpha=self.alpha)
elif self.model_name == 'Lasso':
self._classifier = lm.Lasso(alpha =self.alpha)
elif self.model_name == 'rf':
self._classifier = RandomForestRegressor(n_estimators=10,
elif self.model_name == 'ElasticNet':
self._classifier = lm.ElasticNet(alpha =self.alpha, l1_ratio = self.elastic_ratio)
elif self.model_name == 'omp':
self._classifier = lm.OrthogonalMatchingPursuit(n_nonzero_coefs = self.n_nonzero_coefs)
elif self.model_name == 'br':
self._classifier = lm.BayesianRidge(n_iter=3000,
alpha_1 = self.alpha_1,
alpha_2 = self.alpha_2,
lambda_1 = self.lambda_1,
lambda_2 = self.lambda_2)
raise ValueError('Classifier Not Supported'), Y)
return self
def predict(self, X, y=None):
nan_indexes = np.isnan(X).any(axis=1)
X[nan_indexes] = 0
Y = self._classifier.predict(X)
Y[nan_indexes] = 0
return Y
## Score is opposite of Error,
def overall_score(actual,prediction):
score = r2_score(actual, prediction)
return score
def mean_absolute_percentage_error(y_true, y_pred):
y_true, y_pred = np.array(y_true), np.array(y_pred)
return np.mean(np.abs((y_true - y_pred) / y_true)) * 100
def get_arima_pq(res_df, max_rel_len = 18, min_rel_val = 0.2):
p = sum(np.argwhere(abs(pm.pacf(res_df)) > min_rel_val).flatten() <= max_rel_len)
q = sum(np.argwhere(abs(pm.acf(res_df)) > min_rel_val).flatten()<=max_rel_len)
return p,q
def auto_arima(df):
# Done, check: get max p & q from PACF & ACF values
# MA, q
# AR, p
# n_fits = 20
p,q = get_arima_pq(df)
stepwise_model = pm.auto_arima(df, start_p=0, start_q=0,
max_p=p, max_q=q, m=12,
start_P=0, start_Q=0, seasonal=True,
trace=True, d = 0, max_order= 30,
error_action='ignore', n_jobs =-1,
suppress_warnings=True, stepwise = True, maxiter=200)
return stepwise_model
def check_ts(res_df):
pm.plot_acf(res_df) # MA, q
pm.plot_pacf(res_df) # AR, p
result = seasonal_decompose(res_df, model='additive')
result = seasonal_decompose(res_df, model='multiplicative')
print("-ve or zero(s) encountered in the data")
min_val = res_df.values.min()
res_modify_df = res_df - min_val + 1e-4
result = seasonal_decompose(res_modify_df, model='multiplicative')
def fit_best(df_X,df_Y,input_col, pipeline, algo_grid, split_ratio = 0.80, cv_fold = 3):
The function to run the grid search over all the set of possible provided and return the results,
for the best fitted model and different pre-processing possibilities
df_X, df_Y : Pandas DataFrames
Input and output DataFrames
pipeline : sklearn pipeline
pipeline for the order of methods to be tried
algo_grid: List of Dict
Possible set of algorithms, pre-processing values to be tried :ref:'get_default_grid'
split_ratio: float
ratio of train/ test split
cv_fold: int
number of cross-validation set to be created
Dict with below keys
R2_train, MAPE_train, R2_test, MAPE_test: int
Respective scores
ts_model: pyramid.auto_arima
Best Fitted ARMA model on residue
reg_model: sklearn.linear_model
Best Fitted regression model
reg_model_params: Dict
Dict of parameter values of best fitted models
predicted values for the train data
predicted values for the test data, without ARMA correction
predicted values for the test data, with ARMA correction
results = {}
split_index = int(len(df_X)*split_ratio)
x_train, y_train = df_X[:split_index], df_Y[:split_index]
x_test, y_test = df_X[split_index:], df_Y[split_index:]
cv_method = KFold(n_splits=cv_fold, shuffle=False)
search = GridSearchCV(pipeline , algo_grid, n_jobs=-1, cv=cv_method, scoring = "r2", iid=True,verbose=1)[:split_index].copy(), df_Y[:split_index].copy())
print('Best model:\n', search.best_params_)
model = search.best_estimator_
selected_feature_list = search.best_params_["featureselector__feature_list"]
shift_roll_values_list = []
for col in selected_feature_list:
min_roll_shift = np.min(shift_roll_values_list)
max_roll_shift = np.max(shift_roll_values_list)
#check if both min and max rolling windows are negative
if max_roll_shift <= 0: max_roll_shift = None
#check if both min and max rolling windows are positive
if min_roll_shift >= 0: min_roll_shift = None
predicted_train = model.predict(df_X[:split_index-int(min_roll_shift or 0)].copy())
#Check if shift/rolling window creates expected number of nan at the beginning and end of input dat
assert (not(min_roll_shift) or np.all(predicted_train[min_roll_shift:] == 0)), "error"
assert (not(max_roll_shift) or np.all(predicted_train[:max_roll_shift] == 0)), "error"
predicted_train = predicted_train[max_roll_shift:min_roll_shift]
if split_ratio==1:
predicted_test =[]
predicted_test = model.predict(df_X[split_index-int(max_roll_shift or 0):].copy())
# assert (not(max_roll_shift) or np.all(predicted_test[:max_roll_shift] == 0)), "error"
predicted_test = predicted_test[max_roll_shift:min_roll_shift]
results["R2_train"] = r2_score(y_train[max_roll_shift:min_roll_shift],predicted_train)
results["MAPE_train"] = mean_absolute_percentage_error(y_train[max_roll_shift:min_roll_shift],predicted_train)
if len(predicted_test)>0:
results["R2_test"] = r2_score(y_test[:min_roll_shift],predicted_test)
results["MAPE_test"] = mean_absolute_percentage_error(y_test[:min_roll_shift],predicted_test)
results["R2_test"] = 'No test data'
results["MAPE_test"] = 'No test data'
date_range = pd.date_range(start=x_train.index[0], end=x_train.index[-1], freq='MS')
if len(date_range) == len(x_train.index):
res_train_df = pd.DataFrame((y_train[max_roll_shift:min_roll_shift] - predicted_train))
res_train_df.index = y_train.index[max_roll_shift:min_roll_shift]
res_train_df.columns = ["residue"]
ts_model = auto_arima(res_train_df.copy())
if len(predicted_test)==0:
predicted_test_corrected = predicted_test
ts_corrections = ts_model.predict(len(predicted_test))
predicted_test_corrected = predicted_test+ts_corrections
results["ts_model"] = ts_model
predicted_test_corrected = predicted_test
results["ts_model"] = np.nan
results["test_pred_corrected"] = pd.DataFrame(predicted_test_corrected, index=y_test.index[:min_roll_shift])
results["test_pred"] = pd.DataFrame(predicted_test, index=y_test.index[:min_roll_shift])
results["train_pred"] = pd.DataFrame(predicted_train, index=y_train.index[max_roll_shift:min_roll_shift])
results["reg_model_params"] = search.best_params_
results["reg_model"] = model
return results
#if __name__ == "__main__":
# df = read_data(r"../data",
# "dataset_monthly.xlsx", ["WTI_x", "HH_Gas"] , "RigCount", basin_name = "all")
# X_df = df[["WTI_x", "HH_Gas"]]
# Y_df = df["RigCount"]
# pipeline = Pipeline([
# ('normalize', Scaler()),
# ("timefeatures", MakeTimeFeatures("rolling", [3,3])),
# ('algo', MLAlgoSet())
# ])
# algo_grid = get_default_grid()
# results = fit_best(X_df,Y_df, pipeline, algo_grid)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment