Skip to content

Instantly share code, notes, and snippets.

@karlosos
Last active April 10, 2019 20:17
Show Gist options
  • Save karlosos/8258a2847015406b4a4019a8ee483ac2 to your computer and use it in GitHub Desktop.
Save karlosos/8258a2847015406b4a4019a8ee483ac2 to your computer and use it in GitHub Desktop.
CART - stan na 10.04
"""
CART
"""
from sklearn import datasets
import time
from sklearn.model_selection import train_test_split
from classifiers import DecisionTree
import numpy as np
from misc import * # funkcje do picklowania
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
def pca(data, components=None, variance_sum_ratio=None):
"""
Perform Principal Components Analysis
:param components: Zmniejszanie wymiarowości. Liczba pierwszych komponentów jakie
mają byc brane pod uwagę. Obcina macierz V i wektor L do tej liczby.
:param variance_sum_ratio: Zmniejszanie wymiarowości. Liczba w zakresie od 0 do 1, która określa
ile składowych ma być grane pod uwagę. Dobiera taką najmniejszą liczbę początkowych składowych głównych,
które objaśniają zadaną część całkowitej wariancji.
"""
cov = np.cov(data, rowvar=False) # przyklady ulozone wierszami
# L - wartości wlasne - lambdy - wariancje względem kierunków
# V - wektory własne - ułożone kolumnami
start_time = time.time()
l, v = np.linalg.eig(cov)
L = np.real(l)
V = np.real(v)
ordering = np.argsort(-L)
L = L[ordering]
V = V[:, ordering]
# Redukcja wymiarowości
if variance_sum_ratio is not None:
L, V, _ = slice_variance_sum_ratio(L, V, variance_sum_ratio)
elif components is not None:
L = L[:components]
V = V[:, :components]
elapsed_time = time.time() - start_time
print("time eig:", elapsed_time)
return L, V
def slice_variance_sum_ratio(L, V, variance_sum_ratio):
"""
Zmniejszanie wymiarowości. Dobiera taką najmniejszą liczbę początkowych składowych głównych,
które objaśniają zadaną część całkowitej wariancji. Zwraca zredukowane L, V oraz indeks i do którego
zostało wykonane przycięcie.
:return: zredukowane L, V oraz indeks do którego zostało wykonane przycięcie
"""
for i in range(len(L)):
if np.sum(L[:i])/np.sum(L) >= variance_sum_ratio:
L = L[:i]
V = V[:, :i]
return L, V, i
def load_pca_or_generate(X_train):
"""
Jeżeli istnieją zapiklowane dane PCA to je wczytaj. W innym przypadku wykonaj PCA na podanych danych.
:return: wartości własne i macierze własne
"""
file_exists = os.path.isfile('./data/olivetti_pca.pik')
if file_exists:
L, V = unpickle_all('data/olivetti_pca.pik')
return L, V
else:
L, V = pca(X_train, components=None, variance_sum_ratio=0.95)
pickle_all([L, V], 'data/olivetti_pca.pik')
return L, V
def show_some_images(images, indexes=None, as_grid=True, title=None, subtitles=None):
"""
Wyświetla wiele obrazków jednocześnie.
:param images: dane z obrazkami. Może to być macierz o trzech wymiarach. images[i, j, k], gdzie i to numer zdjęcia,
j, k - współrzędne pikselu, lub o dwóch wymiarach images[i, j], gdzie i to numer zdjęcia a j
:param indexes: indeksy które chcemy wyświetlać
:param as_grid: czy chcemy wyświetlać w siatce - True, lub w linii poziomej.
"""
if indexes is None:
indexes = range(len(images))
shape = images[0].shape
if len(shape) == 1: # obrazki spłaszczone - reprezentowane w postaci wektora, przyjmujemy że obrazki są kwadratowe
img_side = int(np.sqrt(shape))
images = images.reshape(images.shape[0], img_side, img_side)
fig = plt.figure()
plt.gray()
if title is not None:
fig.canvas.set_window_title(title)
grid = int(np.ceil(np.sqrt(len(indexes))))
for i, index in enumerate(indexes):
if as_grid:
plt.subplot(grid, grid, i+1)
else:
plt.subplot(1, len(indexes), i+1)
plt.xticks([])
plt.yticks([])
plt.imshow(images[index])
if subtitles is not None:
plt.title(subtitles[index])
plt.show()
def main():
np.set_printoptions(threshold=np.inf, precision=1)
olivetti = datasets.fetch_olivetti_faces()
glasses = np.genfromtxt('olivetti_glasses.txt', delimiter=',').astype(int)
# tworzymy wektor z labelkami, czy dane zdjęcie przedstawia okularnika
y_glasses = np.zeros(olivetti.data.shape[0])
y_glasses = y_glasses.astype(int)
y_glasses[glasses] = 1
# ile osób ma okulary w zbiorze danych
# print(np.where(y_glasses == 1)[0].size / float(olivetti.data.shape[0]))
# Wybraliśmy, że będziemy uczyć klasyfikator po okularach.
y = y_glasses
# y = y.target
# show_some_images(olivetti.images, glasses, title="Okularnicy")
X_train, X_test, y_train, y_test = train_test_split(olivetti.data, y, test_size=0.2,
stratify=y, random_state=0)
L, V = load_pca_or_generate(X_train)
##
# Classificatione experiments
##
n = 30
X_train_pca = X_train.dot(V[:, :n])
X_test_pca = X_test.dot(V[:, :n])
data_all = olivetti.data.dot(V[:, :n])
dt = DecisionTree(impurity="impurity_entropy", pruning=True)
dt.fit(X_train_pca, y_train)
print(dt.tree_)
print(dt.tree_.shape)
predictions = dt.predict(X_test_pca[:10, :])
print(predictions)
print("Wynik klasyfikacji dla zbioru uczącego:", dt.score(X_train_pca, y_train))
print("Wynik klasyfikacji dla zbioru testowego:", dt.score(X_test_pca, y_test))
print("Wynik klasyfikacji dla zbioru testowego (custom):", np.sum(y_test == dt.predict(X_test_pca)) / y_test.size)
# show_some_images(V.T, indexes=[6, 3, 7])
# show_some_images(X_test[:10, :], subtitles=predictions)
##
# Testy dla głębokości
##
# max_depth = int(np.max(dt.tree_[:, DecisionTree.COL_DEPTH]))
# errors_train = np.zeros(max_depth + 1)
# errors_test = np.zeros(max_depth + 1)
# for d in range(max_depth + 1):
# dt = DecisionTree(impurity="impurity_entropy", max_depth=d)
# dt.fit(X_train_pca, y_train)
# print('depth: ', d, 'shape:', dt.tree_.shape)
# errors_train[d] = 1 - dt.score(X_train_pca, y_train)
# errors_test[d] = 1 - dt.score(X_test_pca, y_test)
#
# np.set_printoptions(threshold=np.inf, precision=5)
# best_depth = np.argmin(errors_test)
# print('BEST DEPTH:', str(best_depth), " WITH TEST ACCURACY:", 1 - errors_test[best_depth])
# print('ERRORS TEST: ', errors_test)
# print('ERRORS TRAIN: ', errors_train)
#
# plt.figure()
# plt.plot(errors_train, color='black', marker='o')
# plt.plot(errors_test, color='red', marker='o')
# plt.show()
##
# Testy dla sample
##
# min_node_vals = np.arange(0.10, 0, -0.01)
# errors_train = np.zeros(min_node_vals.size)
# errors_test = np.zeros(min_node_vals.size)
# for i, min_node_examples in enumerate(min_node_vals):
# dt = DecisionTree(impurity="impurity_entropy", min_node_examples=min_node_examples)
# dt.fit(X_train_pca, y_train)
# print('min node examples: ', min_node_examples)
# errors_train[i] = 1 - dt.score(X_train_pca, y_train)
# errors_test[i] = 1 - dt.score(X_test_pca, y_test)
#
# np.set_printoptions(threshold=np.inf, precision=5)
# best_depth = np.argmin(errors_test)
# print('BEST DEPTH:', str(best_depth), " WITH TEST ACCURACY:", 1 - errors_test[best_depth])
# print('ERRORS TEST: ', errors_test)
# print('ERRORS TRAIN: ', errors_train)
#
# plt.figure()
# plt.plot(errors_train, color='black', marker='o')
# plt.plot(errors_test, color='red', marker='o')
# plt.show()
if __name__ == '__main__':
main()
from sklearn.base import BaseEstimator, ClassifierMixin
import numpy as np
class DecisionTree(BaseEstimator, ClassifierMixin):
"""
Drzewo decyzyjne CART.
"""
COL_PARENT = 0
COL_SPLIT_FEATURE = 1
COL_SPLIT_VALUE = 2
COL_CHILD_LEFT = 3
COL_CHILD_RIGHT = 4
COL_Y = 5
COL_DEPTH = 6
def __init__(self, impurity='impurity_entropy', max_depth = None, min_node_examples=0.01, pruning=False):
self.tree_ = None
self.class_labels_ = None
self.impurity_ = getattr(self, impurity)
self.max_depth_ = max_depth
self.min_node_examples_ = min_node_examples
self.pruning_ = pruning
self.penalty_ = 0.01
def best_split(self, X, y, indexes):
"""
Znajdź najlepsze rozcięcie dla węzła.
Generuje wszystkie możliwe cięcia i liczy ich wartości oczekiwane nieczystości. Wybiera to rozwiązanie
z najmniejszą nieczystością.
:param X: dane
:param y: etykiety
:param indexes:
:return:
"""
n = X.shape[1]
best_k = None
best_v = None
best_expect = np.inf
for k in range(n):
X_indexes_k = X[indexes, k]
u = np.unique(X[indexes, k])
vals = 0.5 * (u[:-1] + u[1:])
for v in vals:
indexes_left = indexes[np.where(X_indexes_k < v)[0]]
indexes_right = indexes[np.where(X_indexes_k >= v)[0]]
distr_left = self.y_distribution(y, indexes_left)
distr_right = self.y_distribution(y, indexes_right)
expect = 1.0 / float(indexes.size) * (indexes_left.size * self.impurity_(distr_left) +
indexes_right.size * self.impurity_(distr_right))
if expect < best_expect:
best_expect = expect
best_k = k
best_v = v
return best_k, best_v, best_expect
def grow_tree(self, X, y, indexes, node_index, depth):
"""
Budowanie pełnego drzewa CART
1. Dla węzła wylicz rozkład p(y|t) i wyznacz najbardziej prawdopodobną klasę
2. Oblicz nieczystość dla węzła t, jeżeli wynosi 0 to przerwij rekurencję.
3. Wybierz najlepsze cięcie w węźle t.
4. Jeżeli wartość oczekiwana nieczystości dla najlepszego cięcia równa się nieczystości
aktualnego węzła to przerwij gałąź rekurencji.
5. Wykonaj najlepsze cięcie - dołącz do drzewa węzły t_l(k_best, v_best), t_r(k_best, v_best)
6. Wywołaj rekurencję na rzecz lewego potomka.
7. Wywołaj rekurencję na rzecz prawego potomka.
:param X:
:param y:
:param indexes: dostępne indeksy do sprawdzenia, na początku wszystkie możliwe. Z każdą iteracją zmniejszane
o połowę.
:param node_index: aktualny węzeł
:param depth:
:return: drzewo w formie macierzy
"""
if self.tree_ is None:
self.tree_ = np.zeros((1, 7))
self.tree_[0, 0] = -1.0
y_distr = self.y_distribution(y, indexes)
imp = self.impurity_(y_distr)
self.tree_[node_index, DecisionTree.COL_DEPTH] = depth # todo sprawdzić
# skojarz z węzłem najbardziej prawdopodobną w nim klasę
self.tree_[node_index, DecisionTree.COL_Y] = self.class_labels_[np.argmax(y_distr)]
if imp == 0.0 or ((self.max_depth_ is not None) and (depth == self.max_depth_)) or \
(indexes.size < self.min_node_examples_ * X.shape[0]):
return self.tree_
# znajdź najlepsze rozcięcie
# k - numer zmiennej
# v - możiwe wartości
k, v, expect = self.best_split(X, y, indexes)
if expect >= imp:
return self.tree_
self.tree_[node_index, DecisionTree.COL_SPLIT_FEATURE] = k
self.tree_[node_index, DecisionTree.COL_SPLIT_VALUE] = v
nodes_so_far = self.tree_.shape[0]
self.tree_[node_index, DecisionTree.COL_CHILD_LEFT] = nodes_so_far
self.tree_[node_index, DecisionTree.COL_CHILD_RIGHT] = nodes_so_far + 1
self.tree_ = np.r_[self.tree_, np.zeros((2, 7))]
self.tree_[nodes_so_far, DecisionTree.COL_PARENT] = node_index
self.tree_[nodes_so_far + 1, DecisionTree.COL_PARENT] = node_index
X_indexes_k = X[indexes, k]
indexes_left = indexes[np.where(X_indexes_k < v)[0]]
indexes_right = indexes[np.where(X_indexes_k >= v)[0]]
self.grow_tree(X, y, indexes_left, nodes_so_far, depth)
self.grow_tree(X, y, indexes_right, nodes_so_far + 1, depth + 1)
return self.tree_
def fit(self, X, y):
"""
Uczymy model na danych X i labelkach y
"""
self.class_labels_ = np.unique(y)
self.grow_tree(X, y, np.arange(X.shape[0]), 0, 0)
if self.pruning_:
scores = self.exhaustive_subtrees(X, y)
print("Scores:", scores)
return self
def exhaustive_subtrees(self, X, y):
tree_prime = np.copy(self.tree_)
tree_prime[0, DecisionTree.COL_CHILD_LEFT] = 0
tree_prime[0, DecisionTree.COL_CHILD_RIGHT] = 0
scores = {}
return self.do_exhaustive_subtrees(X, y, tree_prime, [0], scores)
def do_exhaustive_subtrees(self, X, y, tree_prime, indexes_prime, scores):
"""
:param X:
:param y:
:param tree_prime: aktualne drzewko
:param indexes_prime: indeksy w tprime ktore biora udzial
:param scores:
:return:
"""
# ocena drzewka tree_prime
err = 1 - np.sum(self.predict(X, tree=tree_prime) == y) / float(X.shape[0])
leaves = np.sum(tree_prime[indexes_prime, DecisionTree.COL_CHILD_LEFT] == 0)
scores[str(indexes_prime)] = err + leaves * self.penalty_
return scores
def predict(self, X, tree=None):
"""
Przewidujemy labelki dla danych X
"""
predictions = np.zeros(X.shape[0])
for i in range(X.shape[0]):
predictions[i] = self.predict_x(X[i], tree)
return predictions
def predict_x(self, x, tree=None):
tree = self.tree_ if tree is None else tree
node_index = 0
while True:
if tree[node_index, DecisionTree.COL_CHILD_LEFT] == 0.0:
return tree[node_index, DecisionTree.COL_Y]
k, v = int(tree[node_index, DecisionTree.COL_SPLIT_FEATURE]), tree[node_index,
DecisionTree.COL_SPLIT_VALUE]
if x[k] < v:
node_index = int(tree[node_index, DecisionTree.COL_CHILD_LEFT])
else:
node_index = int(tree[node_index, DecisionTree.COL_CHILD_RIGHT])
def y_distribution(self, y, indexes):
"""
Prawdopodobienstwa warunkowe klasy y pod warunkiem że jesteśmy w węzłach o indeksach. P(y|t)
"""
distr = np.zeros(self.class_labels_.size)
y_indexes = y[indexes]
for i, label in enumerate(self.class_labels_):
distr[i] = np.where(y_indexes == label)[0].size / float(indexes.size)
return distr
def impurity_error(self, y_distr):
"""
Błąd klasyfikacji
:param y_distr: rozklad prawdopodobienstwa nad klasami P(y|t)
"""
return 1.0 - np.max(y_distr)
def impurity_entropy(self, y_distr):
"""
Entropia - miara nieuporządkowania
:param y_distr: rozklad prawdopodobienstwa nad klasami P(y|t)
"""
y_distr = y_distr[y_distr > 0.0]
return -np.sum(y_distr * np.log2(y_distr))
def impurity_gini(self, y_distr):
"""
Indeks giniego
:param y_distr: rozklad prawdopodobienstwa nad klasami P(y|t)
"""
return 1.0 - np.sum(y_distr**2)
if __name__ == '__main__':
y_distr = np.array([0.5, 0.0, 0.25, 0.25])
dt = DecisionTree()
print(dt.impurity_error(y_distr))
print(dt.impurity_entropy(y_distr))
print(dt.impurity_gini(y_distr))
import pickle
import os
def pickle_all(some_list, fname):
os.makedirs(os.path.dirname(fname), exist_ok=True)
f = open(fname, 'wb')
pickle.dump(some_list, f)
f.close()
def unpickle_all(fname):
f = open(fname, 'rb')
some_list = pickle.load(f)
f.close()
return some_list
cycler==0.10.0
kiwisolver==1.0.1
matplotlib==3.0.3
numpy==1.16.2
pyparsing==2.3.1
python-dateutil==2.8.0
scikit-learn==0.20.3
scipy==1.2.1
six==1.12.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment