Skip to content

Instantly share code, notes, and snippets.

@samcarlos
Created August 2, 2019 15:28
Show Gist options
  • Save samcarlos/861d37dc4f9b4b87679179374f96baa6 to your computer and use it in GitHub Desktop.
Save samcarlos/861d37dc4f9b4b87679179374f96baa6 to your computer and use it in GitHub Desktop.
import numpy as np
import pandas as pd
from keras import backend as K
from keras.layers import Input, Dense, Dropout
from keras.models import Model
from keras.wrappers.scikit_learn import KerasRegressor
from sklearn.metrics import make_scorer
from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import label_binarize
from sklearn.model_selection import train_test_split
np.random.seed(2)
#Set up loss functions
def optim_loss(yTrue,yPred):
"""Returns Loss for Keras. This function essentially asks the question:
'what is the probability that a particular treatment yields the maximum?'.
It does this by multiplying a softmax probability to the response variable
of interest. If a response is missing then it is not included in loss.
Args:
yTrue (keras array): True Values
yPred (keras array): Predictions
Returns:
Function to minimize
"""
output = yTrue[:,:-1]* yPred
output = K.sum(output,axis = -1)*yTrue[:,-1]
return -K.mean(output)
def missing_mse_loss(yTrue,yPred):
"""Returns Loss for Keras. This is essentially a multi-output regression that
ignores missing data. It's similar in concept to matrix factorization with
missing data in that it only includes non missing values in loss function.
Args:
yTrue (keras array): True Values
yPred (keras array): Predictions
Returns:
Function to minimize
"""
output = yTrue[:,:-1]* yPred
output = (K.sum(output,axis = -1)-yTrue[:,-1])**2
return K.mean(output)
def expected_gains(yTrue,yPred):
"""Returns Expected Gains to gridsearch over
Args:
yTrue (array): True Values
yPred (array): Predictions
Returns:
Expected model gains compared with 25 cents
"""
is_max = np.argmax(yPred, axis = 1)
prop_model = np.where(yTrue[:,:-1].argmax(axis=1) == is_max)[0]
gains = np.array(yTrue[prop_model,-1]).mean()
return gains
def missing_mse_loss_np(yTrue,yPred):
"""Returns Loss for Keras. This is essentially a multi-output regression that
ignores missing data. It's similar in concept to matrix factorization with
missing data in that it only includes non missing values in loss function.
Args:
yTrue (keras array): True Values
yPred (keras array): Predictions
Returns:
Function to minimize
"""
output = yTrue[:,:-1]* yPred
output_1 = (np.sum(output,axis = -1)-yTrue[:,-1])**2
return np.mean(output_1)
missing_mse_loss_scorer = make_scorer(missing_mse_loss_np, greater_is_better = False)
expected_gains_scorer = make_scorer(expected_gains)
#Functions to build models
def hete_optim_action(input_shape, output_shape, num_nodes = 256, dropout = .5, num_layers = 1,
activation='relu'):
"""Returns ERUPT maximizing neural network
Args:
input_shape (int): number of features in training dataset
num_nodes (int): Number of nodes in layers
dropout (real): dropout percentage
num_layers (real): number of num_layer
activation (keras function): nonlinear activation function
Returns:
Keras Model
"""
input = Input(shape=(input_shape,))
x = Dense(num_nodes, activation=activation)(input)
x = Dropout(dropout)(x)
if(num_layers > 1):
for q in range(num_layers - 1):
x = Dense(num_nodes, activation=activation)(x)
x = Dropout(dropout)(x)
x = Dense(output_shape, activation = 'softmax')(x)
model = Model(input, x)
model.compile(optimizer='rmsprop', loss=optim_loss, metrics = [optim_loss])
return model
def hete_missing_mse(input_shape, output_shape, num_nodes = 256, dropout = .5, num_layers = 1,
activation='relu'):
"""Returns multi-output neural network
Args:
input_shape (int): number of features in training dataset
num_nodes (int): Number of nodes in layers
dropout (real): dropout percentage
num_layers (real): number of num_layer
activation (keras function): nonlinear activation function
Returns:
Keras Model
"""
input = Input(shape=(input_shape,))
x = Dense(num_nodes, activation=activation)(input)
x = Dropout(dropout)(x)
if(num_layers > 1):
for q in range(num_layers - 1):
x = Dense(num_nodes, activation=activation)(x)
x = Dropout(dropout)(x)
x = Dense(output_shape)(x)
model = Model(input, x)
model.compile(optimizer='rmsprop', loss=missing_mse_loss, metrics = [missing_mse_loss])
return model
def get_model(optim = False, param_grid = None,
scorer = expected_gains_scorer):
if param_grid is None:
param_grid = dict(num_nodes = [8,32,64], dropout = [.1,.25,.5,.75,.9],
activation = ['relu', 'tanh'], num_layers = [1,2], epochs = [10])
if optim:
model = KerasRegressor(build_fn=hete_optim_action,
input_shape=X.shape[1], output_shape = dummy_y_train.shape[1] - 1 ,
num_nodes=1, num_layers = 1, epochs=100, verbose = False)
else:
model = KerasRegressor(build_fn=hete_missing_mse,
input_shape=X.shape[1], output_shape = dummy_y_train.shape[1] - 1 ,
num_nodes=1, num_layers = 1, epochs=100, verbose = False)
grid = GridSearchCV(estimator=model, param_grid=param_grid, n_jobs=-1,
scoring = scorer , verbose = False, cv = 5)
return grid
#Generate Data
n_obs = 10000
X = np.concatenate([np.random.normal(0,1,n_obs).reshape(n_obs,1) for x in range(100)], axis = 1)
main_effects = np.random.normal(1,1,10)
#add hete effects to treatments 1 and 2
tmt_coefs_0 = main_effects
tmt_effects_hete = np.random.normal(.1,.05,10)
tmt_coefs_1 = tmt_effects_hete + main_effects
tmt_coefs_2 = -tmt_effects_hete + main_effects
coefs = [tmt_coefs_0, tmt_coefs_1, tmt_coefs_2]
null_coefs = [0 for x in range(90)]
coefs = [np.concatenate([x,null_coefs]).reshape(-1,1) for x in coefs ]
coefs = np.concatenate(coefs,axis=1)
tmts = np.random.choice([0,1,2], n_obs)
responses = np.matmul(X, coefs)
y = responses[np.arange(len(responses)), tmts]
y = y + np.random.normal(0,y.std()*10,n_obs)
y = y.reshape(-1,1)
y = (y-y.mean())/y.std()
t_dummy = label_binarize(tmts, classes = [0,1,2])
dummy_y_train = np.concatenate([t_dummy, y.reshape(-1,1)], axis = 1)
#build models and gridsearch
mse_model = get_model(scorer = missing_mse_loss_scorer)
optim_model = get_model(optim = True)
mse_model.fit(X, dummy_y_train)
optim_model.fit(X, dummy_y_train)
#create test data
x_test = np.concatenate([np.random.normal(0,1,n_obs).reshape(n_obs,1) for x in range(100)], axis = 1)
responses_test = np.matmul(x_test, coefs)
tmts_test = np.random.choice([0,1,2], n_obs)
y_test = responses_test[np.arange(len(responses_test)), tmts_test]
y_test = y_test + np.random.normal(0,y_test.std()*10,n_obs)
y_test = y_test.reshape(-1,1)
y_test = (y_test-y_test.mean())/y_test.std()
t_dummy = label_binarize(tmts, classes = [0,1,2])
#predict
preds_mse = mse_model.predict(x_test)
preds_optim = optim_model.predict(x_test)
argmax_optim = preds_optim.argmax(axis=1)
argmax_mse = preds_mse.argmax(axis=1)
best_choice = responses_test.argmax(axis=1)
choices = [argmax_optim, argmax_mse, best_choice]
#expected profits for each model
[responses_test[np.arange(len(responses_test)),x].mean() for x in choices]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment