Skip to content

Instantly share code, notes, and snippets.

@micahmelling
Last active August 26, 2021 04:16
Show Gist options
  • Save micahmelling/2718b5de46415e0c0f7831b38f7b9b6e to your computer and use it in GitHub Desktop.
Save micahmelling/2718b5de46415e0c0f7831b38f7b9b6e to your computer and use it in GitHub Desktop.
import shap
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import multiprocessing as mp
from functools import partial
from statistics import mean
from data.db import log_feature_importance_to_mysql
plt.switch_backend('Agg')
def _run_shap_explainer(x_df, explainer, boosting_model, use_kernel, nsamples_kernel=500):
"""
Runs the SHAP explainer on a dataframe.
:param x_df: x dataframe
:param explainer: SHAP explainer object
:param boosting_model: Boolean of whether or not the explainer is for a boosting model
:param use_kernel: Boolean of whether or not to use Kernel SHAP, which mostly makes sense when we are using a
CalibratedClassifierCV
:param nsamples_kernel: number of samples to use when employing the kernel explainer
"""
if boosting_model:
if use_kernel:
return explainer.shap_values(x_df, nsamples=nsamples_kernel, check_additivity=False)
else:
return explainer.shap_values(x_df, check_additivity=False)
else:
if use_kernel:
return explainer.shap_values(x_df, nsamples=nsamples_kernel, check_additivity=False)[1]
else:
return explainer.shap_values(x_df, check_additivity=False)[1]
def _run_parallel_shap_explainer(x_df, explainer, boosting_model, use_kernel):
"""
Splits x_df into evenly-split partitions based on the number of CPU available on the machine. Then, the SHAP
explainer object is run in parallel on each subset of x_df. The results are then combined into a single object.
:param x_df: x dataframe
:param explainer: SHAP explainer object
:param boosting_model: Boolean of whether or not the explainer is for a boosting model
:param use_kernel: Boolean of whether or not to use Kernel SHAP, which mostly makes sense when we are using a
CalibratedClassifierCV
"""
array_split = np.array_split(x_df, mp.cpu_count())
shap_fn = partial(_run_shap_explainer, explainer=explainer, boosting_model=boosting_model, use_kernel=use_kernel)
with mp.Pool(processes=mp.cpu_count()) as pool:
result = pool.map(shap_fn, array_split)
result = np.concatenate(result)
return result
def _get_shap_expected_value(explainer, boosting_model):
"""
Extracts a SHAP Explainer's expected value.
:param explainer: SHAP explainer object
:param boosting_model: Boolean of whether or not the explainer is for a boosting model
:returns: int
"""
if boosting_model:
expected_value = explainer.expected_value[0]
else:
try:
expected_value = explainer.expected_value[1]
except IndexError:
expected_value = explainer.expected_value[0]
return expected_value
def _produce_raw_shap_values(model, model_uid, x_df, calibrated, boosting_model, use_kernel):
"""
Produces the raw shap values for every observation in the test set. A dataframe of the shap values is saved locally
as a csv. The shap expected value is extracted and save locally in a csv.
:param model: fitted model
:param model_uid: model uid
:param x_df: x dataframe
:param calibrated: boolean of whether or not the model is a CalibratedClassifierCV; the default is False
:param boosting_model: Boolean of whether or not the explainer is for a boosting model
:param use_kernel: Boolean of whether or not to use Kernel SHAP, which mostly makes sense when we are using a
CalibratedClassifierCV
:returns: numpy array
"""
if calibrated:
if use_kernel:
explainer = shap.KernelExplainer(model.predict_proba, x_df.iloc[:50, :])
shap_values = _run_parallel_shap_explainer(x_df, explainer, boosting_model, True)
shap_expected_value = _get_shap_expected_value(explainer, boosting_model)
else:
shap_values_list = []
shap_expected_list = []
for calibrated_classifier in model.calibrated_classifiers_:
explainer = shap.TreeExplainer(calibrated_classifier.base_estimator)
shap_values = _run_parallel_shap_explainer(x_df, explainer, boosting_model, False)
shap_expected_value = _get_shap_expected_value(explainer, boosting_model)
shap_values_list.append(shap_values)
shap_expected_list.append(shap_expected_value)
shap_values = np.array(shap_values_list).sum(axis=0) / len(shap_values_list)
shap_expected_value = mean(shap_expected_list)
shap_df = pd.DataFrame(shap_values, columns=list(x_df))
shap_df.to_csv(os.path.join('modeling', model_uid, 'diagnostics', 'shap', 'shap_values.csv'), index=False)
shap_expected_value = pd.DataFrame({'expected_value': [shap_expected_value]})
shap_expected_value.to_csv(os.path.join('modeling', model_uid, 'diagnostics', 'shap', 'shap_expected.csv'),
index=False)
else:
explainer = shap.TreeExplainer(model)
shap_values = _run_parallel_shap_explainer(x_df, explainer, boosting_model, False)
shap_df = pd.DataFrame(shap_values, columns=list(x_df))
shap_df.to_csv(os.path.join('modeling', model_uid, 'diagnostics', 'shap', 'shap_values.csv'), index=False)
shap_expected_value = _get_shap_expected_value(explainer, boosting_model)
shap_expected_value = pd.DataFrame({'expected_value': [shap_expected_value]})
shap_expected_value.to_csv(os.path.join('modeling', model_uid, 'diagnostics', 'shap', 'shap_expected.csv'),
index=False)
return shap_values
def _generate_shap_global_values(shap_values, x_df, model_uid, db_schema_name, db_conn, log_to_db):
"""
Extracts the global shape values for every feature ans saves the outcome as a dataframe locally. Amends the
dataframe so that it could be used in log_feature_importance_to_mysql().
:param shap_values: numpy array of shap values
:param x_df: x_df dataframe
:param model_uid: model uid
:param db_schema_name: database schema to log metrics to
:param log_to_db: Boolean of whether to log scores to the database
:param db_conn: database connection
:returns: pandas dataframe
"""
shap_values = np.abs(shap_values).mean(0)
df = pd.DataFrame(list(zip(x_df.columns, shap_values)), columns=['feature', 'shap_value'])
df.sort_values(by=['shap_value'], ascending=False, inplace=True)
df.to_csv(os.path.join('modeling', model_uid, 'diagnostics', 'shap', 'shap_global.csv'), index=False)
df.rename(columns={'shap_value': 'importance_score'}, inplace=True)
df['model_uid'] = model_uid
df['importance_metric'] = 'shap'
df = df[['model_uid', 'feature', 'importance_score', 'importance_metric']]
if log_to_db:
log_feature_importance_to_mysql(df, db_schema_name, db_conn)
def _generate_shap_plot(shap_values, x_df, model_uid, plot_type):
"""
Generates a plot of shap values and saves it locally.
:param shap_values: numpy array of shap values produced for x_df
:param x_df: x dataframe
:param model_uid: model uid
:param plot_type: the type of plot we want to generate; generally, either dot or bar
"""
shap.summary_plot(shap_values, x_df, plot_type=plot_type, show=False)
plt.savefig(os.path.join('modeling', model_uid, 'diagnostics', 'shap', f'shap_values_{plot_type}.png'),
bbox_inches='tight')
plt.clf()
def produce_shap_values_and_plots(model, x_df, model_uid, boosting_model, use_kernel, calibrated, db_schema_name,
db_conn, log_to_db):
"""
Produces SHAP values for x_df and writes associated diagnostics locally.
:param model: model with predict method
:param x_df: x dataframe
:param model_uid: model uid
:param boosting_model: Boolean of whether or not the explainer is for a boosting model
:param use_kernel: Boolean of whether or not to use Kernel SHAP, which mostly makes sense when we are using a
CalibratedClassifierCV
:param calibrated: boolean of whether or not the model is a CalibratedClassifierCV
:param db_schema_name: database schema to log metrics to
:param db_conn: database connection
:param log_to_db: Boolean of whether to log scores to the database
"""
shap_values = _produce_raw_shap_values(model, model_uid, x_df, calibrated, boosting_model, use_kernel)
_generate_shap_global_values(shap_values, x_df, model_uid, db_schema_name, db_conn, log_to_db)
_generate_shap_plot(shap_values, x_df, model_uid, 'dot')
_generate_shap_plot(shap_values, x_df, model_uid, 'bar')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment