Last active
November 6, 2023 02:36
-
-
Save thistleknot/3555b36e7828f42e1bbf38351dde3271 to your computer and use it in GitHub Desktop.
Mean Based Clustering
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from sklearn.model_selection import KFold | |
from sklearn.model_selection import KFold | |
from sklearn.metrics import mean_squared_error | |
import numpy as np | |
import pandas as pd | |
from sklearn.metrics import mean_squared_error | |
from graphviz import Source | |
df = pd.read_csv("states.csv").set_index('States') | |
# Function to calculate feature improvements | |
def calculate_feature_improvements(X, y, n_splits=5): | |
# If there are fewer samples than n_splits, reduce n_splits to the number of samples | |
n_splits = min(n_splits, len(X)) | |
# If there's only one or no sample left, no further splitting is possible | |
if n_splits <= 1: | |
return {} | |
kf = KFold(n_splits=n_splits) | |
feature_errors = {feature: [] for feature in X.columns} | |
feature_errors['random'] = [] # Add the random feature | |
for train_index, test_index in kf.split(X): | |
labels_train = y.iloc[train_index].copy() | |
features_train = X.iloc[train_index].copy() | |
features_train['random'] = np.random.rand(len(features_train)) | |
overall_mean = labels_train.mean() | |
overall_error = mean_squared_error(labels_train, [overall_mean] * len(labels_train)) | |
for feature in features_train.columns: | |
mean_value = features_train[feature].mean() | |
above_mean = features_train[feature] >= mean_value | |
below_mean = features_train[feature] < mean_value | |
# Ensure there are samples above and below the mean before calculating error | |
if above_mean.any(): | |
error_above = mean_squared_error(labels_train[above_mean], [labels_train[above_mean].mean()] * sum(above_mean)) | |
else: | |
error_above = 0 | |
if below_mean.any(): | |
error_below = mean_squared_error(labels_train[below_mean], [labels_train[below_mean].mean()] * sum(below_mean)) | |
else: | |
error_below = 0 | |
total_error = error_above + error_below | |
improvement = overall_error - total_error | |
feature_errors[feature].append(improvement) | |
average_improvements = {feature: np.mean(improvements) for feature, improvements in feature_errors.items() if improvements} | |
average_improvement_random = average_improvements.pop('random', 0) | |
improvements_over_random = {feature: improvement - average_improvement_random for feature, improvement in average_improvements.items()} | |
return improvements_over_random | |
# Function to perform dataset split based on a feature | |
def split_dataset(X, y, feature): | |
mean_value = X[feature].mean() | |
above_mean = X[X[feature] >= mean_value] | |
below_mean = X[X[feature] < mean_value] | |
labels_above_mean = y.loc[above_mean.index] | |
labels_below_mean = y.loc[below_mean.index] | |
return (above_mean, labels_above_mean), (below_mean, labels_below_mean) | |
# Updating the build_tree function to properly propagate n, mean, and sdev up the tree to each decision node | |
def build_tree(X, y, depth=1, max_depth=None, leaf_id=0): | |
# Use a unique identifier for each node | |
if max_depth is not None and depth > max_depth or len(X) == 0: | |
return { | |
'prediction': y.mean() if len(y) > 0 else float('nan'), # Avoid division by zero | |
'leaf_id': leaf_id, | |
'n': len(y), | |
'sdev': y.std(ddof=0) if len(y) > 1 else float('nan'), # Standard deviation for population | |
'mean': y.mean() if len(y) > 0 else float('nan') | |
} | |
# Calculate feature improvements for the current subset | |
feature_improvements = calculate_feature_improvements(X, y) | |
# Stop if no feature provides an improvement over random | |
if not feature_improvements or max(feature_improvements.values()) <= 0: | |
return { | |
'prediction': y.mean() if len(y) > 0 else float('nan'), | |
'leaf_id': leaf_id, | |
'n': len(y), | |
'sdev': y.std(ddof=0) if len(y) > 1 else float('nan'), | |
'mean': y.mean() if len(y) > 0 else float('nan') | |
} | |
# Select the best feature based on improvements_over_random | |
best_feature = max(feature_improvements, key=feature_improvements.get) | |
# Perform the split | |
(above_mean, labels_above_mean), (below_mean, labels_below_mean) = split_dataset(X, y, best_feature) | |
# If a split does not result in child nodes, return the prediction | |
if above_mean.empty or below_mean.empty: | |
return { | |
'prediction': y.mean() if len(y) > 0 else float('nan'), | |
'leaf_id': leaf_id, | |
'n': len(y), | |
'sdev': y.std(ddof=0) if len(y) > 1 else float('nan'), | |
'mean': y.mean() if len(y) > 0 else float('nan') | |
} | |
# Recursively build the subtrees with incremented leaf_id | |
left_subtree = build_tree(below_mean, labels_below_mean, depth + 1, max_depth, leaf_id=leaf_id * 2 + 1) | |
right_subtree = build_tree(above_mean, labels_above_mean, depth + 1, max_depth, leaf_id=leaf_id * 2 + 2) | |
# Return the node and its subtrees | |
return { | |
'feature': best_feature, | |
'split_value': X[best_feature].mean(), | |
'n': len(y), | |
'sdev': y.std(ddof=0) if len(y) > 1 else float('nan'), | |
'mean': y.mean() if len(y) > 0 else float('nan'), | |
'left': left_subtree, | |
'right': right_subtree | |
} | |
def tree_to_dot(tree, dot_string=None, counter=[0]): | |
if dot_string is None: | |
dot_string = ['digraph decision_tree {'] | |
dot_string.append(' node [shape=box];') | |
node_id = counter[0] | |
if 'prediction' in tree: | |
# Leaf node | |
# Ensure the prediction is a scalar value for formatting | |
prediction = tree['prediction'] | |
if isinstance(prediction, pd.Series): | |
prediction = prediction.iloc[0] | |
sdev = tree['sdev'] | |
if isinstance(sdev, pd.Series): | |
sdev = sdev.iloc[0] | |
n = tree['n'] | |
if isinstance(n, pd.Series): | |
n = n.iloc[0] | |
dot_string.append(f' {node_id} [label="Prediction: {prediction:.2f}\nStDev: {sdev:.2f}\nn: {n}"];') | |
else: | |
# Decision node | |
dot_string.append(f' {node_id} [label="{tree["feature"]} <= {tree["split_value"]:.2f}?"];') | |
# Left subtree (below mean) | |
counter[0] += 1 | |
left_child_id = counter[0] | |
dot_string.append(f' {node_id} -> {left_child_id} [label="True"];') | |
tree_to_dot(tree['left'], dot_string, counter) | |
# Right subtree (above mean) | |
counter[0] += 1 | |
right_child_id = counter[0] | |
dot_string.append(f' {node_id} -> {right_child_id} [label="False"];') | |
tree_to_dot(tree['right'], dot_string, counter) | |
if node_id == 0: | |
dot_string.append('}') | |
return '\n'.join(dot_string) | |
return dot_string | |
# Function to traverse the tree and return the prediction for a given row of features | |
def predict(tree, x): | |
# If we've reached a leaf node, return the prediction and leaf_id | |
if 'prediction' in tree: | |
return tree['prediction'], tree['leaf_id'] | |
# Decide whether to follow the left or right subtree based on the split feature's value | |
if x[tree['feature']] < tree['split_value']: | |
return predict(tree['left'], x) | |
else: | |
return predict(tree['right'], x) | |
# Function to apply the prediction function to each row in a dataframe | |
def predict_dataset(X, tree): | |
predictions, leaf_ids = zip(*X.apply(lambda x: predict(tree, x), axis=1)) | |
return predictions, leaf_ids | |
# Build the decision tree | |
max_depth = 10 # Example max_depth | |
# Assume X_train, y_train for training and X_test, y_test for testing | |
# Here we would build the tree using the training data | |
decision_tree = build_tree(X, y, max_depth=max_depth) | |
# Use the modified function to get predictions and leaf ids | |
predictions, leaf_ids = predict_dataset(X, decision_tree) | |
# Finally, calculate the Mean Squared Error on the test data | |
mse = mean_squared_error(y, predictions) | |
print(mse) | |
dot_description = tree_to_dot(decision_tree) | |
print(dot_description) | |
# Convert the DOT description to a Source object | |
source = Source(dot_description) | |
# Render the source object to a file | |
source.render('decision_tree', format='png', cleanup=True) | |
# Display the rendered graph (this will open it in your default image viewer) | |
source.view() | |
labeled_df = pd.concat([y,X,pd.DataFrame(leaf_ids,columns=['label']).set_index(X.index)],axis=1) | |
labeled_df.groupby('label').size() | |
average_by_label = labeled_df.groupby('label').mean()[y.columns[0]] # y.name is the column name of y | |
sorted_labels = average_by_label.sort_values(ascending=False) # Sort by average in descending order | |
plt.plot(sorted_labels.values) | |
for g in sorted_labels.index: | |
subset = labeled_df[labeled_df['label']==g] | |
print(subset.index) | |
print(subset.describe()) | |
#print(len(subset),) | |
#plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment