Created
April 25, 2021 15:00
-
-
Save Eligijus112/a28967462cd025de0d7d61dac3cb566c to your computer and use it in GitHub Desktop.
Regression tree implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Data wrangling | |
import pandas as pd | |
# Array math | |
import numpy as np | |
# Quick value count calculator | |
from collections import Counter | |
class NodeRegression(): | |
""" | |
Class to grow a regression decision tree | |
""" | |
def __init__( | |
self, | |
Y: list, | |
X: pd.DataFrame, | |
min_samples_split=None, | |
max_depth=None, | |
depth=None, | |
node_type=None, | |
rule=None | |
): | |
# Saving the data to the node | |
self.Y = Y | |
self.X = X | |
# Saving the hyper parameters | |
self.min_samples_split = min_samples_split if min_samples_split else 20 | |
self.max_depth = max_depth if max_depth else 5 | |
# Default current depth of node | |
self.depth = depth if depth else 0 | |
# Extracting all the features | |
self.features = list(self.X.columns) | |
# Type of node | |
self.node_type = node_type if node_type else 'root' | |
# Rule for spliting | |
self.rule = rule if rule else "" | |
# Getting the mean of Y | |
self.ymean = np.mean(Y) | |
# Getting the residuals | |
self.residuals = self.Y - self.ymean | |
# Calculating the mse of the node | |
self.mse = self.get_mse(Y, self.ymean) | |
# Saving the number of observations in the node | |
self.n = len(Y) | |
# Initiating the left and right nodes as empty nodes | |
self.left = None | |
self.right = None | |
# Default values for splits | |
self.best_feature = None | |
self.best_value = None | |
@staticmethod | |
def get_mse(ytrue, yhat) -> float: | |
""" | |
Method to calculate the mean squared error | |
""" | |
# Getting the total number of samples | |
n = len(ytrue) | |
# Getting the residuals | |
r = ytrue - yhat | |
# Squering the residuals | |
r = r ** 2 | |
# Suming | |
r = np.sum(r) | |
# Getting the average and returning | |
return r / n | |
@staticmethod | |
def ma(x: np.array, window: int) -> np.array: | |
""" | |
Calculates the moving average of the given list. | |
""" | |
return np.convolve(x, np.ones(window), 'valid') / window | |
def best_split(self) -> tuple: | |
""" | |
Given the X features and Y targets calculates the best split | |
for a decision tree | |
""" | |
# Creating a dataset for spliting | |
df = self.X.copy() | |
df['Y'] = self.Y | |
# Getting the GINI impurity for the base input | |
mse_base = self.mse | |
# Finding which split yields the best GINI gain | |
#max_gain = 0 | |
# Default best feature and split | |
best_feature = None | |
best_value = None | |
for feature in self.features: | |
# Droping missing values | |
Xdf = df.dropna().sort_values(feature) | |
# Sorting the values and getting the rolling average | |
xmeans = self.ma(Xdf[feature].unique(), 2) | |
for value in xmeans: | |
# Getting the left and right ys | |
left_y = Xdf[Xdf[feature]<value]['Y'].values | |
right_y = Xdf[Xdf[feature]>=value]['Y'].values | |
# Getting the means | |
left_mean = np.mean(left_y) | |
right_mean = np.mean(right_y) | |
# Getting the left and right residuals | |
res_left = left_y - left_mean | |
res_right = right_y - right_mean | |
# Concatenating the residuals | |
r = np.concatenate((res_left, res_right), axis=None) | |
# Calculating the mse | |
n = len(r) | |
r = r ** 2 | |
r = np.sum(r) | |
mse_split = r / n | |
# Checking if this is the best split so far | |
if mse_split < mse_base: | |
best_feature = feature | |
best_value = value | |
# Setting the best gain to the current one | |
mse_base = mse_split | |
return (best_feature, best_value) | |
def grow_tree(self): | |
""" | |
Recursive method to create the decision tree | |
""" | |
# Making a df from the data | |
df = self.X.copy() | |
df['Y'] = self.Y | |
# If there is GINI to be gained, we split further | |
if (self.depth < self.max_depth) and (self.n >= self.min_samples_split): | |
# Getting the best split | |
best_feature, best_value = self.best_split() | |
if best_feature is not None: | |
# Saving the best split to the current node | |
self.best_feature = best_feature | |
self.best_value = best_value | |
# Getting the left and right nodes | |
left_df, right_df = df[df[best_feature]<=best_value].copy(), df[df[best_feature]>best_value].copy() | |
# Creating the left and right nodes | |
left = NodeRegression( | |
left_df['Y'].values.tolist(), | |
left_df[self.features], | |
depth=self.depth + 1, | |
max_depth=self.max_depth, | |
min_samples_split=self.min_samples_split, | |
node_type='left_node', | |
rule=f"{best_feature} <= {round(best_value, 3)}" | |
) | |
self.left = left | |
self.left.grow_tree() | |
right = NodeRegression( | |
right_df['Y'].values.tolist(), | |
right_df[self.features], | |
depth=self.depth + 1, | |
max_depth=self.max_depth, | |
min_samples_split=self.min_samples_split, | |
node_type='right_node', | |
rule=f"{best_feature} > {round(best_value, 3)}" | |
) | |
self.right = right | |
self.right.grow_tree() | |
def print_info(self, width=4): | |
""" | |
Method to print the infromation about the tree | |
""" | |
# Defining the number of spaces | |
const = int(self.depth * width ** 1.5) | |
spaces = "-" * const | |
if self.node_type == 'root': | |
print("Root") | |
else: | |
print(f"|{spaces} Split rule: {self.rule}") | |
print(f"{' ' * const} | MSE of the node: {round(self.mse, 2)}") | |
print(f"{' ' * const} | Count of observations in node: {self.n}") | |
print(f"{' ' * const} | Prediction of node: {round(self.ymean, 3)}") | |
def print_tree(self): | |
""" | |
Prints the whole tree from the current node to the bottom | |
""" | |
self.print_info() | |
if self.left is not None: | |
self.left.print_tree() | |
if self.right is not None: | |
self.right.print_tree() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment