Created
August 2, 2019 15:28
-
-
Save samcarlos/861d37dc4f9b4b87679179374f96baa6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
from keras import backend as K | |
from keras.layers import Input, Dense, Dropout | |
from keras.models import Model | |
from keras.wrappers.scikit_learn import KerasRegressor | |
from sklearn.metrics import make_scorer | |
from sklearn.model_selection import GridSearchCV | |
from sklearn.preprocessing import label_binarize | |
from sklearn.model_selection import train_test_split | |
np.random.seed(2) | |
#Set up loss functions | |
def optim_loss(yTrue,yPred): | |
"""Returns Loss for Keras. This function essentially asks the question: | |
'what is the probability that a particular treatment yields the maximum?'. | |
It does this by multiplying a softmax probability to the response variable | |
of interest. If a response is missing then it is not included in loss. | |
Args: | |
yTrue (keras array): True Values | |
yPred (keras array): Predictions | |
Returns: | |
Function to minimize | |
""" | |
output = yTrue[:,:-1]* yPred | |
output = K.sum(output,axis = -1)*yTrue[:,-1] | |
return -K.mean(output) | |
def missing_mse_loss(yTrue,yPred): | |
"""Returns Loss for Keras. This is essentially a multi-output regression that | |
ignores missing data. It's similar in concept to matrix factorization with | |
missing data in that it only includes non missing values in loss function. | |
Args: | |
yTrue (keras array): True Values | |
yPred (keras array): Predictions | |
Returns: | |
Function to minimize | |
""" | |
output = yTrue[:,:-1]* yPred | |
output = (K.sum(output,axis = -1)-yTrue[:,-1])**2 | |
return K.mean(output) | |
def expected_gains(yTrue,yPred): | |
"""Returns Expected Gains to gridsearch over | |
Args: | |
yTrue (array): True Values | |
yPred (array): Predictions | |
Returns: | |
Expected model gains compared with 25 cents | |
""" | |
is_max = np.argmax(yPred, axis = 1) | |
prop_model = np.where(yTrue[:,:-1].argmax(axis=1) == is_max)[0] | |
gains = np.array(yTrue[prop_model,-1]).mean() | |
return gains | |
def missing_mse_loss_np(yTrue,yPred): | |
"""Returns Loss for Keras. This is essentially a multi-output regression that | |
ignores missing data. It's similar in concept to matrix factorization with | |
missing data in that it only includes non missing values in loss function. | |
Args: | |
yTrue (keras array): True Values | |
yPred (keras array): Predictions | |
Returns: | |
Function to minimize | |
""" | |
output = yTrue[:,:-1]* yPred | |
output_1 = (np.sum(output,axis = -1)-yTrue[:,-1])**2 | |
return np.mean(output_1) | |
missing_mse_loss_scorer = make_scorer(missing_mse_loss_np, greater_is_better = False) | |
expected_gains_scorer = make_scorer(expected_gains) | |
#Functions to build models | |
def hete_optim_action(input_shape, output_shape, num_nodes = 256, dropout = .5, num_layers = 1, | |
activation='relu'): | |
"""Returns ERUPT maximizing neural network | |
Args: | |
input_shape (int): number of features in training dataset | |
num_nodes (int): Number of nodes in layers | |
dropout (real): dropout percentage | |
num_layers (real): number of num_layer | |
activation (keras function): nonlinear activation function | |
Returns: | |
Keras Model | |
""" | |
input = Input(shape=(input_shape,)) | |
x = Dense(num_nodes, activation=activation)(input) | |
x = Dropout(dropout)(x) | |
if(num_layers > 1): | |
for q in range(num_layers - 1): | |
x = Dense(num_nodes, activation=activation)(x) | |
x = Dropout(dropout)(x) | |
x = Dense(output_shape, activation = 'softmax')(x) | |
model = Model(input, x) | |
model.compile(optimizer='rmsprop', loss=optim_loss, metrics = [optim_loss]) | |
return model | |
def hete_missing_mse(input_shape, output_shape, num_nodes = 256, dropout = .5, num_layers = 1, | |
activation='relu'): | |
"""Returns multi-output neural network | |
Args: | |
input_shape (int): number of features in training dataset | |
num_nodes (int): Number of nodes in layers | |
dropout (real): dropout percentage | |
num_layers (real): number of num_layer | |
activation (keras function): nonlinear activation function | |
Returns: | |
Keras Model | |
""" | |
input = Input(shape=(input_shape,)) | |
x = Dense(num_nodes, activation=activation)(input) | |
x = Dropout(dropout)(x) | |
if(num_layers > 1): | |
for q in range(num_layers - 1): | |
x = Dense(num_nodes, activation=activation)(x) | |
x = Dropout(dropout)(x) | |
x = Dense(output_shape)(x) | |
model = Model(input, x) | |
model.compile(optimizer='rmsprop', loss=missing_mse_loss, metrics = [missing_mse_loss]) | |
return model | |
def get_model(optim = False, param_grid = None, | |
scorer = expected_gains_scorer): | |
if param_grid is None: | |
param_grid = dict(num_nodes = [8,32,64], dropout = [.1,.25,.5,.75,.9], | |
activation = ['relu', 'tanh'], num_layers = [1,2], epochs = [10]) | |
if optim: | |
model = KerasRegressor(build_fn=hete_optim_action, | |
input_shape=X.shape[1], output_shape = dummy_y_train.shape[1] - 1 , | |
num_nodes=1, num_layers = 1, epochs=100, verbose = False) | |
else: | |
model = KerasRegressor(build_fn=hete_missing_mse, | |
input_shape=X.shape[1], output_shape = dummy_y_train.shape[1] - 1 , | |
num_nodes=1, num_layers = 1, epochs=100, verbose = False) | |
grid = GridSearchCV(estimator=model, param_grid=param_grid, n_jobs=-1, | |
scoring = scorer , verbose = False, cv = 5) | |
return grid | |
#Generate Data | |
n_obs = 10000 | |
X = np.concatenate([np.random.normal(0,1,n_obs).reshape(n_obs,1) for x in range(100)], axis = 1) | |
main_effects = np.random.normal(1,1,10) | |
#add hete effects to treatments 1 and 2 | |
tmt_coefs_0 = main_effects | |
tmt_effects_hete = np.random.normal(.1,.05,10) | |
tmt_coefs_1 = tmt_effects_hete + main_effects | |
tmt_coefs_2 = -tmt_effects_hete + main_effects | |
coefs = [tmt_coefs_0, tmt_coefs_1, tmt_coefs_2] | |
null_coefs = [0 for x in range(90)] | |
coefs = [np.concatenate([x,null_coefs]).reshape(-1,1) for x in coefs ] | |
coefs = np.concatenate(coefs,axis=1) | |
tmts = np.random.choice([0,1,2], n_obs) | |
responses = np.matmul(X, coefs) | |
y = responses[np.arange(len(responses)), tmts] | |
y = y + np.random.normal(0,y.std()*10,n_obs) | |
y = y.reshape(-1,1) | |
y = (y-y.mean())/y.std() | |
t_dummy = label_binarize(tmts, classes = [0,1,2]) | |
dummy_y_train = np.concatenate([t_dummy, y.reshape(-1,1)], axis = 1) | |
#build models and gridsearch | |
mse_model = get_model(scorer = missing_mse_loss_scorer) | |
optim_model = get_model(optim = True) | |
mse_model.fit(X, dummy_y_train) | |
optim_model.fit(X, dummy_y_train) | |
#create test data | |
x_test = np.concatenate([np.random.normal(0,1,n_obs).reshape(n_obs,1) for x in range(100)], axis = 1) | |
responses_test = np.matmul(x_test, coefs) | |
tmts_test = np.random.choice([0,1,2], n_obs) | |
y_test = responses_test[np.arange(len(responses_test)), tmts_test] | |
y_test = y_test + np.random.normal(0,y_test.std()*10,n_obs) | |
y_test = y_test.reshape(-1,1) | |
y_test = (y_test-y_test.mean())/y_test.std() | |
t_dummy = label_binarize(tmts, classes = [0,1,2]) | |
#predict | |
preds_mse = mse_model.predict(x_test) | |
preds_optim = optim_model.predict(x_test) | |
argmax_optim = preds_optim.argmax(axis=1) | |
argmax_mse = preds_mse.argmax(axis=1) | |
best_choice = responses_test.argmax(axis=1) | |
choices = [argmax_optim, argmax_mse, best_choice] | |
#expected profits for each model | |
[responses_test[np.arange(len(responses_test)),x].mean() for x in choices] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment