Skip to content

Instantly share code, notes, and snippets.

@alastairparagas
Created April 19, 2020 00:52
Show Gist options
  • Save alastairparagas/16e9206c68864ed7785d0e571f33dc69 to your computer and use it in GitHub Desktop.
Save alastairparagas/16e9206c68864ed7785d0e571f33dc69 to your computer and use it in GitHub Desktop.
Decision Tree
import numpy as np
class DTLearner(object):
leaf_size = None
verbose = None
tree = None
def __init__(self, leaf_size=None, verbose=False):
self.leaf_size = leaf_size
self.verbose = verbose
def get_best_feature_index(self, Xtrain, Ytrain, exclude=[]):
"""
Returns the column index in Xtrain of the best feature
vector or None if there isn't one
"""
# Discover the best feature to "split" on
feature_set = Xtrain.transpose()
correlation_matrix = np.absolute(
np.corrcoef(feature_set, Ytrain)
)
feature_vectors_count = feature_set.shape[0]
target_vector_index = feature_vectors_count
# If there is only 1 feature, then it's automatically the
# best feature
if feature_vectors_count == 1:
return 0
correlation_tuplets = sorted(
[
(correlation_matrix[i][target_vector_index], i)
for i in range(feature_vectors_count)
if i not in exclude
],
key=lambda tuplet: tuplet[0],
reverse=True
)
if len(correlation_tuplets) == 0:
return None
best_feature_score, best_feature_index = correlation_tuplets[0]
split_val = np.median(Xtrain[:, best_feature_index])
left_subtree_existence = Xtrain[:, best_feature_index] <= split_val
# Edge case: all values of best_feature vector <= split_val - avoid infinite recursion
if left_subtree_existence[
left_subtree_existence == True
].shape[0] == left_subtree_existence.shape[0]:
if feature_vectors_count < 2:
return None
# Find next best possible feature vector
return self.get_best_feature_index(
Xtrain, Ytrain, exclude=exclude+[best_feature_index]
)
return best_feature_index
def __build_tree(self, Xtrain, Ytrain, leaf_size=None):
"""
Recursively build a decision tree
"""
# Make sure inputs are valid
if leaf_size is not None and not isinstance(leaf_size, int):
raise ValueError("leaf_size parameter is incorrect. Must be None or >= 1")
if isinstance(leaf_size, int) and leaf_size < 1:
raise ValueError("leaf_size parameter is incorrect. Must be None or >= 1")
record_types = np.dtype([
('Factor', np.object), ('SplitVal', np.float64),
('Left', np.int32), ('Right', np.int32)
])
# Base cases to prevent that call stack explosion
if Xtrain.shape[0] == 1:
if self.verbose:
print "Leaf: 1 training sample"
return np.array([(-1, Ytrain[0], -1, -1)], dtype=record_types)
if np.all(Ytrain == Ytrain[0], axis=0):
if self.verbose:
print "Leaf: Ytrain has same values for everything"
return np.array([(-1, Ytrain[0], -1, -1)], dtype=record_types)
if leaf_size >= Ytrain.shape[0]:
if self.verbose:
print "Leaf: leaf_size forces us to aggregate"
return np.array([(-1, np.mean(Ytrain), -1, -1)], dtype=record_types)
if Xtrain.shape[1] == 1:
if self.verbose:
print "Leaf: 1 column left"
return np.array([(-1, np.mean(Ytrain[0]), -1, -1)], dtype=record_types)
# Find best feature vector to use for split
best_feature_index = self.get_best_feature_index(Xtrain, Ytrain)
if best_feature_index is None:
if self.verbose:
print "Leaf: Cannot find a supreme feature vector"
return np.array([(-1, np.mean(Ytrain), -1, -1)], dtype=record_types)
split_val = np.median(Xtrain[:, best_feature_index])
left_subtree_existence = Xtrain[:, best_feature_index] <= split_val
if self.verbose:
print "Splitting on feature {index} with val: {val}".format(
index=best_feature_index,
val=split_val
)
left_subtree = self.__build_tree(
Xtrain[left_subtree_existence],
Ytrain[left_subtree_existence],
leaf_size
)
right_subtree = self.__build_tree(
Xtrain[~left_subtree_existence],
Ytrain[~left_subtree_existence],
leaf_size
)
root = np.array(
[(best_feature_index, split_val, 1, left_subtree.shape[0] + 1)],
dtype=record_types
)
return np.append(
np.append(root, left_subtree), right_subtree
)
def __single_query(self, test_vector, start=0):
feature_index = self.tree[start][0]
split_val = self.tree[start][1]
left_findex = self.tree[start][2]
right_findex = self.tree[start][3]
# Starting at a leaf - end and return leaf value
if feature_index == -1:
return split_val
if test_vector[feature_index] <= split_val:
return self.__single_query(
test_vector, start=start+left_findex
)
return self.__single_query(
test_vector, start=start+right_findex
)
def addEvidence(self, Xtrain, Ytrain):
self.tree = self.__build_tree(
Xtrain, Ytrain, 1 if self.leaf_size is None else self.leaf_size
)
def query(self, Xtest):
return np.array([
self.__single_query(test_vector)
for test_vector in Xtest
])
def author(self):
return 'aparagas3'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment