Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save quantra-go-algo/7c1ab7ae5a89c47ac6ef6f6e32a5fd2a to your computer and use it in GitHub Desktop.
Save quantra-go-algo/7c1ab7ae5a89c47ac6ef6f6e32a5fd2a to your computer and use it in GitHub Desktop.
def boruta_shap_algorithm(X, y, trials=20, workers=2, significance_level=0.05, seed=2024):
# Set the seed
np.random.seed(seed)
# Assert that the number of samples of both data match
assert X.shape[0] == y.shape[0], "X and y dimensions don't coincide"
# Set a dictionary to save the number of hits for each feature
features_hits = {feature:0 for feature in X.columns}
# Create the names of all the features shuffled
shuffled_col_names = [str(column+'_shuffle') for column in X.columns]
# Set the train and test X data
X_train, X_test = X.iloc[:int(0.8*len(X))], X.iloc[int(0.8*len(X)):]
# Set the label enconder object
le = LabelEncoder()
# Transform the y series to a prediction features useful for the machine-learning model
label_encoded = le.fit_transform(y)
# Transform the encoded label into a Pandas series
y = pd.Series(data=label_encoded, index=y.index, name='y')
# Set the y train data
y_train = y.iloc[:int(0.8*len(y))]
# define the model
classifier = XGBRFClassifier(n_estimators=100, subsample=1, colsample_bynode=1, tree_method='gpu_hist', random_state=seed)
# Define a function to compute the number of times the features
def features_hits_func(trial):
# Set the seed for the trial
np.random.seed(seed+trial)
# Set the X train data for the shuffled features
X_shuffle_train = X_train.apply(np.random.permutation)
# Set the names for the X train shuffled features
X_shuffle_train.columns = shuffled_col_names
# Set the X-test data for the shuffled features
X_shuffle_test = X_test.apply(np.random.permutation)
# Set the names for the X-test shuffled features
X_shuffle_test.columns = shuffled_col_names
# Set the whole input features for the Boruta-Shap algorithm training
X_boruta_train = pd.concat([X_train, X_shuffle_train], axis=1)
# Set the whole input features for the Boruta-Shap algorithm test data
X_boruta_test = pd.concat([X_test, X_shuffle_test], axis=1)
# Fit the model
model = classifier.fit(X_boruta_train, y_train)
# Set the explainer object
explainer = shap.TreeExplainer(model)
# Get the Shap values for each feature
shap_values = explainer.shap_values(X_boruta_test)
# Set the mean value of each feature's Shap values
features_importance = np.array(np.abs(shap_values).mean(0))
# Set a dataframe with the above features' importance
features_importance_df = pd.DataFrame(data=features_importance, index=X_boruta_test.columns, columns=['Values'])
# Subset the feature importance dataframe with the non-shuffled features
feature_imp_X = features_importance_df.iloc[:len(X.columns)]
# Subset the feature importance dataframe with the shuffled features
feature_imp_shuffled = features_importance_df.iloc[len(X.columns):]
# Add one hit in case the feature is better than the best Shap value of all the shuffled features
for feature in feature_imp_X.index:
features_hits[feature] += int(feature_imp_X.loc[feature,'Values'] > feature_imp_shuffled['Values'].max())
# Define a function to run multiple trials as per the maximum number of cores available in your CPU
def multithreading_loop(function, params_list):
# Set the number of lists we'll have as per the number of cores
num_lists = int(np.floor(len(params_list)/workers))
# Set the params list to be used to loop
params_list_for_loop = params_list[:int(num_lists*workers)]
# If the number of trials in the above list is higher than the num_lists
if len(params_list)>int(num_lists*workers):
# Create the last params list to be used to multithread the computations
last_params_list = params_list[int(num_lists*workers):]
# For each list of trials
for i in range(0,num_lists):
# Use the number of cores for the futures library executor
with futures.ThreadPoolExecutor(workers) as executor:
# Run the features_hits_func function to compute the hits in parallel
list(executor.map(function, params_list_for_loop[int(workers*i):int(workers*(i+1))]))
# Once you finish the above, run the last trials to be computed in parallel
if len(params_list)>int(num_lists*workers):
# Use the number of cores for the futures library executor
with futures.ThreadPoolExecutor(len(last_params_list)) as executor:
# Run the features_hits_func function to compute the hits in parallel
list(executor.map(function, last_params_list))
# Set the range for the number of trails as a list
trails_list = [*range(trials)]
# Run the loop to compute the trails in parallel in buckets
multithreading_loop(features_hits_func, trails_list)
# Calculate the probability mass function: Get the Binomial distribution in "trials" number of buckets
pmf = [sp.stats.binom.pmf(x, trials, .5) for x in range(trials + 1)]
# Set the minimum number of trials as the threshold to classify an input feature as a selected feature
thresh = get_tail_items(pmf, significance_level)
# green are the accepted features, blue are the tentative features
green, blue = choose_features(features_hits, trials, thresh)
# If there are green features
if len(green) != 0:
# Return the green features
return green
# If there aren't green features
else:
# If there are blue features
if len(blue) != 0:
# Return the blue features
return blue
# If there aren't blue features
else:
# Return all the features
return X.columns.tolist()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment