Created
June 24, 2024 21:41
-
-
Save quantra-go-algo/7c1ab7ae5a89c47ac6ef6f6e32a5fd2a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def boruta_shap_algorithm(X, y, trials=20, workers=2, significance_level=0.05, seed=2024): | |
# Set the seed | |
np.random.seed(seed) | |
# Assert that the number of samples of both data match | |
assert X.shape[0] == y.shape[0], "X and y dimensions don't coincide" | |
# Set a dictionary to save the number of hits for each feature | |
features_hits = {feature:0 for feature in X.columns} | |
# Create the names of all the features shuffled | |
shuffled_col_names = [str(column+'_shuffle') for column in X.columns] | |
# Set the train and test X data | |
X_train, X_test = X.iloc[:int(0.8*len(X))], X.iloc[int(0.8*len(X)):] | |
# Set the label enconder object | |
le = LabelEncoder() | |
# Transform the y series to a prediction features useful for the machine-learning model | |
label_encoded = le.fit_transform(y) | |
# Transform the encoded label into a Pandas series | |
y = pd.Series(data=label_encoded, index=y.index, name='y') | |
# Set the y train data | |
y_train = y.iloc[:int(0.8*len(y))] | |
# define the model | |
classifier = XGBRFClassifier(n_estimators=100, subsample=1, colsample_bynode=1, tree_method='gpu_hist', random_state=seed) | |
# Define a function to compute the number of times the features | |
def features_hits_func(trial): | |
# Set the seed for the trial | |
np.random.seed(seed+trial) | |
# Set the X train data for the shuffled features | |
X_shuffle_train = X_train.apply(np.random.permutation) | |
# Set the names for the X train shuffled features | |
X_shuffle_train.columns = shuffled_col_names | |
# Set the X-test data for the shuffled features | |
X_shuffle_test = X_test.apply(np.random.permutation) | |
# Set the names for the X-test shuffled features | |
X_shuffle_test.columns = shuffled_col_names | |
# Set the whole input features for the Boruta-Shap algorithm training | |
X_boruta_train = pd.concat([X_train, X_shuffle_train], axis=1) | |
# Set the whole input features for the Boruta-Shap algorithm test data | |
X_boruta_test = pd.concat([X_test, X_shuffle_test], axis=1) | |
# Fit the model | |
model = classifier.fit(X_boruta_train, y_train) | |
# Set the explainer object | |
explainer = shap.TreeExplainer(model) | |
# Get the Shap values for each feature | |
shap_values = explainer.shap_values(X_boruta_test) | |
# Set the mean value of each feature's Shap values | |
features_importance = np.array(np.abs(shap_values).mean(0)) | |
# Set a dataframe with the above features' importance | |
features_importance_df = pd.DataFrame(data=features_importance, index=X_boruta_test.columns, columns=['Values']) | |
# Subset the feature importance dataframe with the non-shuffled features | |
feature_imp_X = features_importance_df.iloc[:len(X.columns)] | |
# Subset the feature importance dataframe with the shuffled features | |
feature_imp_shuffled = features_importance_df.iloc[len(X.columns):] | |
# Add one hit in case the feature is better than the best Shap value of all the shuffled features | |
for feature in feature_imp_X.index: | |
features_hits[feature] += int(feature_imp_X.loc[feature,'Values'] > feature_imp_shuffled['Values'].max()) | |
# Define a function to run multiple trials as per the maximum number of cores available in your CPU | |
def multithreading_loop(function, params_list): | |
# Set the number of lists we'll have as per the number of cores | |
num_lists = int(np.floor(len(params_list)/workers)) | |
# Set the params list to be used to loop | |
params_list_for_loop = params_list[:int(num_lists*workers)] | |
# If the number of trials in the above list is higher than the num_lists | |
if len(params_list)>int(num_lists*workers): | |
# Create the last params list to be used to multithread the computations | |
last_params_list = params_list[int(num_lists*workers):] | |
# For each list of trials | |
for i in range(0,num_lists): | |
# Use the number of cores for the futures library executor | |
with futures.ThreadPoolExecutor(workers) as executor: | |
# Run the features_hits_func function to compute the hits in parallel | |
list(executor.map(function, params_list_for_loop[int(workers*i):int(workers*(i+1))])) | |
# Once you finish the above, run the last trials to be computed in parallel | |
if len(params_list)>int(num_lists*workers): | |
# Use the number of cores for the futures library executor | |
with futures.ThreadPoolExecutor(len(last_params_list)) as executor: | |
# Run the features_hits_func function to compute the hits in parallel | |
list(executor.map(function, last_params_list)) | |
# Set the range for the number of trails as a list | |
trails_list = [*range(trials)] | |
# Run the loop to compute the trails in parallel in buckets | |
multithreading_loop(features_hits_func, trails_list) | |
# Calculate the probability mass function: Get the Binomial distribution in "trials" number of buckets | |
pmf = [sp.stats.binom.pmf(x, trials, .5) for x in range(trials + 1)] | |
# Set the minimum number of trials as the threshold to classify an input feature as a selected feature | |
thresh = get_tail_items(pmf, significance_level) | |
# green are the accepted features, blue are the tentative features | |
green, blue = choose_features(features_hits, trials, thresh) | |
# If there are green features | |
if len(green) != 0: | |
# Return the green features | |
return green | |
# If there aren't green features | |
else: | |
# If there are blue features | |
if len(blue) != 0: | |
# Return the blue features | |
return blue | |
# If there aren't blue features | |
else: | |
# Return all the features | |
return X.columns.tolist() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment