Skip to content

Instantly share code, notes, and snippets.

@thomasmooon
Created January 15, 2021 08:48
Show Gist options
  • Save thomasmooon/4cedc82f1cd9d3db41e3f86dea8971e0 to your computer and use it in GitHub Desktop.
Save thomasmooon/4cedc82f1cd9d3db41e3f86dea8971e0 to your computer and use it in GitHub Desktop.
AI-DAS Tamara
# -*- coding: utf-8 -*-
"""
Created on Tue Nov 24 12:57:55 2020
@author: tlinden
elastic net penalized cox regression
with (dummy coding) and standardization
stacked in a sklearn pipeline
tuned with bayesian hyperparamter optimization
within a nested cross validation
memos:
CoxnetSurvivalAnalysis score is harrell's c-index
"""
skf__random_state = 1
skf__n_splits = 5
optuna__experiment = 0
optuna__n_trials = 10
optuna__n_jobs = 1
optuna__objective = "cHarrell" # choices: ibs, cHarrell (default), cUno
mlflow__run_name = "en_cox__stratCv__OptunaSearchCV__load_whas500"
import mlflow
import numpy as np
import optuna
import pandas as pd
from optuna.integration import OptunaSearchCV
from optuna.distributions import LogUniformDistribution
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.compose import ColumnTransformer
from sksurv.linear_model import CoxnetSurvivalAnalysis as CoxNet
from sksurv.metrics import concordance_index_censored
from sksurv.metrics import concordance_index_ipcw
from sksurv.metrics import integrated_brier_score as ibs
from os.path import join
import pickle
import time
import tempfile
# =============================================================================
# scoring functions
# =============================================================================
def score_charrell(estimator, X, y, train_index, test_index) -> float:
ytest_pred = estimator.predict(X.iloc[test_index,:])
score = concordance_index_censored(y['event'][test_index], y['time'][test_index], ytest_pred)
return score[0]
def score_cuno(estimator, X, y, train_index, test_index) -> float:
ytest_pred = estimator.predict(X.iloc[test_index,:])
tmax_train = np.max(y[train_index]['time'])
tmax_val = np.max(y[test_index]['time'])
tmax = np.min([tmax_train, tmax_val])
score = concordance_index_ipcw(y[train_index], y[test_index], ytest_pred, tau = tmax)
return score[0]
def score_ibs(estimator, X, y, train_index, test_index) -> float:
"""
wraps the ibs[1]. unique event times in `y` and `ypred` can be different.
the ibs can only be computed for joint times. hence only joint times are considered.
the last time point of the joint times is truncated, as required from the method.
[1] from sksurv.metrics import integrated_brier_score as ibs
Arguments
---------
y: structured np.array
ypred: np.array
individuals survival probabilities, shape = (nsamp, timepoints)
eg output of predict_survival_function() of a Cox model
train_index: list or np.array
test_index: list or np.array
"""
ytest_pred_survmat = estimator.predict_survival_function(X.iloc[test_index,:])
ytrain = y[train_index]
ytest = y[test_index]
# consider only joint times
ytrain_times = [_[1] for _ in ytrain]
ytest_times = [_[1] for _ in ytest]
ypred_times = ytest_pred_survmat[0].x
tjoint = np.intersect1d(ytest_times, ypred_times)
tjoint = np.intersect1d(tjoint, ytrain_times)
# filter mask from joint times
fmask = [t in tjoint for t in ytest_times]
## apply filter on ytest ground truth
ytest_filtered = ytest[fmask]
# apply filter on ytest predictions
tjoint_belowmax = tjoint[:-1]
ytest_pred = np.asarray([[fn(t) for t in tjoint_belowmax] for fn in ytest_pred_survmat])
ytest_pred_filtered = ytest_pred[fmask,:]
# ibs
score = ibs(ytrain, ytest_filtered, ytest_pred_filtered, tjoint_belowmax)
return score
# =============================================================================
# example data
# =============================================================================
from sksurv.datasets import load_whas500
X, y = load_whas500()
X = X.astype(float)
# transform days to months
y = pd.DataFrame(y)
y.lenfol = (y.lenfol/30).round(0)
y = [tuple(y.iloc[i,:]) for i in range(y.shape[0])]
y = np.array(y, np.dtype([("event","bool"),("time","float")]))
# from sksurv.datasets import load_gbsg2
# X, y = load_gbsg2()
# # transform days to months
# y = pd.DataFrame(y)
# y["time"] = (y["time"]/365).round(0)
# y = [tuple(y.iloc[i,:]) for i in range(y.shape[0])]
# y = np.array(y, np.dtype([("event","bool"),("time","float")]))
# =============================================================================
# define model
# =============================================================================
# column transformer
cat_cols = X.select_dtypes("category").columns
num_cols = X.select_dtypes("number").columns
cox_model_preprocessor = ColumnTransformer([
("OneHotEncoder",OneHotEncoder(sparse=False),cat_cols),
("StandardScaler",StandardScaler(), num_cols),
])
# pipeline
estimator = Pipeline([
("preprocessor", cox_model_preprocessor),
("en_cox", CoxNet(fit_baseline_model=True))
])
# hyperparameters
param_distributions = {
"en_cox__l1_ratio" : LogUniformDistribution(1e-3, 1e-0)
}
# =============================================================================
# nested cross-validation
# =============================================================================
current_outer = 0
skf_outer = StratifiedKFold(n_splits = skf__n_splits)
ygroup = [_[0] for _ in y]
with mlflow.start_run(run_name = mlflow__run_name, experiment_id = optuna__experiment) as run_parent:
for train_index, test_index in skf_outer.split(X, ygroup):
with mlflow.start_run(
run_name = mlflow__run_name,
experiment_id = optuna__experiment,
nested=True) as run_child:
current_outer +=1
print(f"\n {'='*20} current outer fold = {current_outer} {'='*20} \n")
time.sleep(0.5) # pause ensures correct output order
# inner cross validation loop: stratified cross validation indices
skf_inner = StratifiedKFold(n_splits = skf__n_splits)
ygroup_inner = [_[0] for _ in y[train_index]]
sfk_inner_iter = []
for train_index_inner, val_index_inner in skf_inner.split(X.loc[train_index], ygroup_inner):
sfk_inner_iter.append((train_index_inner, val_index_inner))
# study = optuna.create_study()
inner_search = OptunaSearchCV(
estimator = estimator,
# study=study,
cv = sfk_inner_iter,
param_distributions = param_distributions,
n_trials = optuna__n_trials,
n_jobs = optuna__n_jobs,
return_train_score=True
)
inner_search.fit(X.loc[train_index], y[train_index])
# =============================================================================
# log params
# =============================================================================
params = inner_search.study_.best_trial.params # hyperparams best model
params["optuna__objective"] = optuna__objective
params["outer_fold"] = current_outer
mlflow.log_params(params)
# =============================================================================
# log metrics
# =============================================================================
# inner folds
metrics_inner = inner_search.study_.best_trial.user_attrs
metrics_inner = {k.replace("test","val"):v for k,v in metrics_inner.items()}
metrics = metrics_inner
# outer fold
metrics_outer = {
"test_score" : inner_search.score(X.loc[test_index], y[test_index]),
"test_cHarrell" : score_charrell(inner_search.best_estimator_, X, y, train_index, test_index),
"test_cUno" : score_cuno(inner_search.best_estimator_, X, y, train_index, test_index),
"test_ibs" : score_ibs(inner_search.best_estimator_, X, y, train_index, test_index),
}
# log
metrics.update(metrics_outer)
mlflow.log_metrics(metrics)
# =============================================================================
# log artifacts
# =============================================================================
with tempfile.TemporaryDirectory() as tmpdir:
# inner_search with best model
with open(join(tmpdir,f"OptunaSearchCV_split{current_outer}.pkl"),"wb") as file:
pickle.dump(inner_search, file)
# optuna trials dataframe
trials_dataframe = inner_search.study_.trials_dataframe()
trials_dataframe.to_csv(join(tmpdir,"optuna__trials_dataframe.csv"), sep = "\t")
trials_dataframe.to_json(join(tmpdir,"optuna__trials_dataframe.csv.json"), orient = "split")
# optuna plots
optuna_plot1 = optuna.visualization.plot_optimization_history(inner_search.study_)
optuna_plot2 = optuna.visualization.plot_edf(inner_search.study_)
optuna_plot3 = optuna.visualization.plot_param_importances(inner_search.study_)
optuna_plot4 = optuna.visualization.plot_slice(inner_search.study_)
optuna_plot1.write_html(join(tmpdir,"optuna__plot_optimization_history.html"))
optuna_plot2.write_html(join(tmpdir,"optuna__plot_edf.html"))
optuna_plot3.write_html(join(tmpdir,"optuna__plot_param_importances.html"))
optuna_plot4.write_html(join(tmpdir,"optuna__plot_slice.html"))
mlflow.log_artifacts(tmpdir)
# -*- coding: utf-8 -*-
"""
Created on Fri Dec 18 11:32:25 2020
@author: tlinden
"""
from sklearn.pipeline import Pipeline
from sklearn.model_selection import StratifiedKFold, KFold
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
# MLP for Pima Indians Dataset with grid search via sklearn
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense
from keras.wrappers.scikit_learn import KerasRegressor
from sklearn.model_selection import GridSearchCV
import numpy as np
# Function to create model, required for KerasClassifier
def create_model(optimizer='rmsprop', init='glorot_uniform'):
# create model
model = Sequential()
model.add(Dense(6, input_dim=13, kernel_initializer=init, activation='tanh'))
model.add(Dense(3, kernel_initializer=init, activation='tanh'))
model.add(Dense(1, kernel_initializer=init))
# Compile model
model.compile(loss='mse', optimizer=optimizer, metrics=['mse'])
return model
# fix random seed for reproducibility
seed = 7
np.random.seed(seed)
# load pima indians dataset
from sklearn.datasets import load_boston
X, Y = load_boston(return_X_y=True)
print(X.shape)
# create model
# grid search epochs, batch size and optimizer
optimizers = ['nadam']
init = ['orthogonal']
epochs = [500]
batches = [32]
# =============================================================================
# without early stopping
# =============================================================================
param_grid = dict(model__optimizer=optimizers,
model__epochs=epochs,
model__batch_size=batches,
model__init=init)
model = KerasRegressor(build_fn=create_model, verbose=0)
pipe = Pipeline([
("transformer",StandardScaler()),
("model",model)
])
grid = GridSearchCV(estimator=pipe, param_grid=param_grid, n_jobs = 5,)
grid_result = grid.fit(X, Y)
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
print("%f (%f) with: %r" % (mean, stdev, param))
# grid_result.predict(X[:10])
# =============================================================================
# with early stopping
# =============================================================================
class KerasRegressor_EarlyStop(KerasRegressor):
"""integrate early stopping callback """
def fit(self, x, y, **kwargs):
nfolds = 5
split = 1/nfolds
new_defaults = {"validation_split" : split}
kwargs.update(new_defaults)
callback = tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True,
)
return super().fit(x, y, **kwargs, callbacks = [callback])
def early_stopping_cv_split_iter(x, n_splits):
"""
When using early stopping with the 'validation_split' parameter, 'The
validation data is selected from the last samples in the x and y data provided, before shuffling.'
according to https://keras.io/api/models/model_training_apis/.
Therefore the validation data needs to be attached to the training data
if early stopping needs to be used in combination with a cross-validation procedure.
The cross-validation procedure uses the `val_idx` samples to evaluate the performance
on the held-out subset and the `train_idx` samples to fit the model.
But: Due to the routine presribed in the paragraph before, the fraction used to control
the early stopping during the model fitting process, the last samples from the x and y
are used. Eventually, this requires to append the x and y passed to the .fit procedure
with the held-out subset to adapt to this routine.
"""
# TODO(consider stratified cv-split for classification / time-to-event)
cv = KFold(n_splits = n_splits)
cv_iter = []
for train_idx, val_idx in cv.split(x):
# train_idx_earlystop = [train_idx, val_idx]
train_idx_earlystop = np.concatenate((train_idx, val_idx))
cv_iter.append((train_idx_earlystop, val_idx))
return cv_iter
model = KerasRegressor_EarlyStop(build_fn=create_model, verbose=0)
pipe = Pipeline([
("transformer",StandardScaler()),
("model",model),
])
grid = GridSearchCV(
estimator=pipe,
param_grid=param_grid,
cv = early_stopping_cv_split_iter(X,5),
n_jobs = 5,
)
grid_result = grid.fit(X, Y)
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
print("%f (%f) with: %r" % (mean, stdev, param))
grid_result.predict(X[:10])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment