Last active
April 10, 2019 20:17
-
-
Save karlosos/8258a2847015406b4a4019a8ee483ac2 to your computer and use it in GitHub Desktop.
CART - stan na 10.04
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
CART | |
""" | |
from sklearn import datasets | |
import time | |
from sklearn.model_selection import train_test_split | |
from classifiers import DecisionTree | |
import numpy as np | |
from misc import * # funkcje do picklowania | |
import matplotlib | |
matplotlib.use('TkAgg') | |
import matplotlib.pyplot as plt | |
def pca(data, components=None, variance_sum_ratio=None): | |
""" | |
Perform Principal Components Analysis | |
:param components: Zmniejszanie wymiarowości. Liczba pierwszych komponentów jakie | |
mają byc brane pod uwagę. Obcina macierz V i wektor L do tej liczby. | |
:param variance_sum_ratio: Zmniejszanie wymiarowości. Liczba w zakresie od 0 do 1, która określa | |
ile składowych ma być grane pod uwagę. Dobiera taką najmniejszą liczbę początkowych składowych głównych, | |
które objaśniają zadaną część całkowitej wariancji. | |
""" | |
cov = np.cov(data, rowvar=False) # przyklady ulozone wierszami | |
# L - wartości wlasne - lambdy - wariancje względem kierunków | |
# V - wektory własne - ułożone kolumnami | |
start_time = time.time() | |
l, v = np.linalg.eig(cov) | |
L = np.real(l) | |
V = np.real(v) | |
ordering = np.argsort(-L) | |
L = L[ordering] | |
V = V[:, ordering] | |
# Redukcja wymiarowości | |
if variance_sum_ratio is not None: | |
L, V, _ = slice_variance_sum_ratio(L, V, variance_sum_ratio) | |
elif components is not None: | |
L = L[:components] | |
V = V[:, :components] | |
elapsed_time = time.time() - start_time | |
print("time eig:", elapsed_time) | |
return L, V | |
def slice_variance_sum_ratio(L, V, variance_sum_ratio): | |
""" | |
Zmniejszanie wymiarowości. Dobiera taką najmniejszą liczbę początkowych składowych głównych, | |
które objaśniają zadaną część całkowitej wariancji. Zwraca zredukowane L, V oraz indeks i do którego | |
zostało wykonane przycięcie. | |
:return: zredukowane L, V oraz indeks do którego zostało wykonane przycięcie | |
""" | |
for i in range(len(L)): | |
if np.sum(L[:i])/np.sum(L) >= variance_sum_ratio: | |
L = L[:i] | |
V = V[:, :i] | |
return L, V, i | |
def load_pca_or_generate(X_train): | |
""" | |
Jeżeli istnieją zapiklowane dane PCA to je wczytaj. W innym przypadku wykonaj PCA na podanych danych. | |
:return: wartości własne i macierze własne | |
""" | |
file_exists = os.path.isfile('./data/olivetti_pca.pik') | |
if file_exists: | |
L, V = unpickle_all('data/olivetti_pca.pik') | |
return L, V | |
else: | |
L, V = pca(X_train, components=None, variance_sum_ratio=0.95) | |
pickle_all([L, V], 'data/olivetti_pca.pik') | |
return L, V | |
def show_some_images(images, indexes=None, as_grid=True, title=None, subtitles=None): | |
""" | |
Wyświetla wiele obrazków jednocześnie. | |
:param images: dane z obrazkami. Może to być macierz o trzech wymiarach. images[i, j, k], gdzie i to numer zdjęcia, | |
j, k - współrzędne pikselu, lub o dwóch wymiarach images[i, j], gdzie i to numer zdjęcia a j | |
:param indexes: indeksy które chcemy wyświetlać | |
:param as_grid: czy chcemy wyświetlać w siatce - True, lub w linii poziomej. | |
""" | |
if indexes is None: | |
indexes = range(len(images)) | |
shape = images[0].shape | |
if len(shape) == 1: # obrazki spłaszczone - reprezentowane w postaci wektora, przyjmujemy że obrazki są kwadratowe | |
img_side = int(np.sqrt(shape)) | |
images = images.reshape(images.shape[0], img_side, img_side) | |
fig = plt.figure() | |
plt.gray() | |
if title is not None: | |
fig.canvas.set_window_title(title) | |
grid = int(np.ceil(np.sqrt(len(indexes)))) | |
for i, index in enumerate(indexes): | |
if as_grid: | |
plt.subplot(grid, grid, i+1) | |
else: | |
plt.subplot(1, len(indexes), i+1) | |
plt.xticks([]) | |
plt.yticks([]) | |
plt.imshow(images[index]) | |
if subtitles is not None: | |
plt.title(subtitles[index]) | |
plt.show() | |
def main(): | |
np.set_printoptions(threshold=np.inf, precision=1) | |
olivetti = datasets.fetch_olivetti_faces() | |
glasses = np.genfromtxt('olivetti_glasses.txt', delimiter=',').astype(int) | |
# tworzymy wektor z labelkami, czy dane zdjęcie przedstawia okularnika | |
y_glasses = np.zeros(olivetti.data.shape[0]) | |
y_glasses = y_glasses.astype(int) | |
y_glasses[glasses] = 1 | |
# ile osób ma okulary w zbiorze danych | |
# print(np.where(y_glasses == 1)[0].size / float(olivetti.data.shape[0])) | |
# Wybraliśmy, że będziemy uczyć klasyfikator po okularach. | |
y = y_glasses | |
# y = y.target | |
# show_some_images(olivetti.images, glasses, title="Okularnicy") | |
X_train, X_test, y_train, y_test = train_test_split(olivetti.data, y, test_size=0.2, | |
stratify=y, random_state=0) | |
L, V = load_pca_or_generate(X_train) | |
## | |
# Classificatione experiments | |
## | |
n = 30 | |
X_train_pca = X_train.dot(V[:, :n]) | |
X_test_pca = X_test.dot(V[:, :n]) | |
data_all = olivetti.data.dot(V[:, :n]) | |
dt = DecisionTree(impurity="impurity_entropy", pruning=True) | |
dt.fit(X_train_pca, y_train) | |
print(dt.tree_) | |
print(dt.tree_.shape) | |
predictions = dt.predict(X_test_pca[:10, :]) | |
print(predictions) | |
print("Wynik klasyfikacji dla zbioru uczącego:", dt.score(X_train_pca, y_train)) | |
print("Wynik klasyfikacji dla zbioru testowego:", dt.score(X_test_pca, y_test)) | |
print("Wynik klasyfikacji dla zbioru testowego (custom):", np.sum(y_test == dt.predict(X_test_pca)) / y_test.size) | |
# show_some_images(V.T, indexes=[6, 3, 7]) | |
# show_some_images(X_test[:10, :], subtitles=predictions) | |
## | |
# Testy dla głębokości | |
## | |
# max_depth = int(np.max(dt.tree_[:, DecisionTree.COL_DEPTH])) | |
# errors_train = np.zeros(max_depth + 1) | |
# errors_test = np.zeros(max_depth + 1) | |
# for d in range(max_depth + 1): | |
# dt = DecisionTree(impurity="impurity_entropy", max_depth=d) | |
# dt.fit(X_train_pca, y_train) | |
# print('depth: ', d, 'shape:', dt.tree_.shape) | |
# errors_train[d] = 1 - dt.score(X_train_pca, y_train) | |
# errors_test[d] = 1 - dt.score(X_test_pca, y_test) | |
# | |
# np.set_printoptions(threshold=np.inf, precision=5) | |
# best_depth = np.argmin(errors_test) | |
# print('BEST DEPTH:', str(best_depth), " WITH TEST ACCURACY:", 1 - errors_test[best_depth]) | |
# print('ERRORS TEST: ', errors_test) | |
# print('ERRORS TRAIN: ', errors_train) | |
# | |
# plt.figure() | |
# plt.plot(errors_train, color='black', marker='o') | |
# plt.plot(errors_test, color='red', marker='o') | |
# plt.show() | |
## | |
# Testy dla sample | |
## | |
# min_node_vals = np.arange(0.10, 0, -0.01) | |
# errors_train = np.zeros(min_node_vals.size) | |
# errors_test = np.zeros(min_node_vals.size) | |
# for i, min_node_examples in enumerate(min_node_vals): | |
# dt = DecisionTree(impurity="impurity_entropy", min_node_examples=min_node_examples) | |
# dt.fit(X_train_pca, y_train) | |
# print('min node examples: ', min_node_examples) | |
# errors_train[i] = 1 - dt.score(X_train_pca, y_train) | |
# errors_test[i] = 1 - dt.score(X_test_pca, y_test) | |
# | |
# np.set_printoptions(threshold=np.inf, precision=5) | |
# best_depth = np.argmin(errors_test) | |
# print('BEST DEPTH:', str(best_depth), " WITH TEST ACCURACY:", 1 - errors_test[best_depth]) | |
# print('ERRORS TEST: ', errors_test) | |
# print('ERRORS TRAIN: ', errors_train) | |
# | |
# plt.figure() | |
# plt.plot(errors_train, color='black', marker='o') | |
# plt.plot(errors_test, color='red', marker='o') | |
# plt.show() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sklearn.base import BaseEstimator, ClassifierMixin | |
import numpy as np | |
class DecisionTree(BaseEstimator, ClassifierMixin): | |
""" | |
Drzewo decyzyjne CART. | |
""" | |
COL_PARENT = 0 | |
COL_SPLIT_FEATURE = 1 | |
COL_SPLIT_VALUE = 2 | |
COL_CHILD_LEFT = 3 | |
COL_CHILD_RIGHT = 4 | |
COL_Y = 5 | |
COL_DEPTH = 6 | |
def __init__(self, impurity='impurity_entropy', max_depth = None, min_node_examples=0.01, pruning=False): | |
self.tree_ = None | |
self.class_labels_ = None | |
self.impurity_ = getattr(self, impurity) | |
self.max_depth_ = max_depth | |
self.min_node_examples_ = min_node_examples | |
self.pruning_ = pruning | |
self.penalty_ = 0.01 | |
def best_split(self, X, y, indexes): | |
""" | |
Znajdź najlepsze rozcięcie dla węzła. | |
Generuje wszystkie możliwe cięcia i liczy ich wartości oczekiwane nieczystości. Wybiera to rozwiązanie | |
z najmniejszą nieczystością. | |
:param X: dane | |
:param y: etykiety | |
:param indexes: | |
:return: | |
""" | |
n = X.shape[1] | |
best_k = None | |
best_v = None | |
best_expect = np.inf | |
for k in range(n): | |
X_indexes_k = X[indexes, k] | |
u = np.unique(X[indexes, k]) | |
vals = 0.5 * (u[:-1] + u[1:]) | |
for v in vals: | |
indexes_left = indexes[np.where(X_indexes_k < v)[0]] | |
indexes_right = indexes[np.where(X_indexes_k >= v)[0]] | |
distr_left = self.y_distribution(y, indexes_left) | |
distr_right = self.y_distribution(y, indexes_right) | |
expect = 1.0 / float(indexes.size) * (indexes_left.size * self.impurity_(distr_left) + | |
indexes_right.size * self.impurity_(distr_right)) | |
if expect < best_expect: | |
best_expect = expect | |
best_k = k | |
best_v = v | |
return best_k, best_v, best_expect | |
def grow_tree(self, X, y, indexes, node_index, depth): | |
""" | |
Budowanie pełnego drzewa CART | |
1. Dla węzła wylicz rozkład p(y|t) i wyznacz najbardziej prawdopodobną klasę | |
2. Oblicz nieczystość dla węzła t, jeżeli wynosi 0 to przerwij rekurencję. | |
3. Wybierz najlepsze cięcie w węźle t. | |
4. Jeżeli wartość oczekiwana nieczystości dla najlepszego cięcia równa się nieczystości | |
aktualnego węzła to przerwij gałąź rekurencji. | |
5. Wykonaj najlepsze cięcie - dołącz do drzewa węzły t_l(k_best, v_best), t_r(k_best, v_best) | |
6. Wywołaj rekurencję na rzecz lewego potomka. | |
7. Wywołaj rekurencję na rzecz prawego potomka. | |
:param X: | |
:param y: | |
:param indexes: dostępne indeksy do sprawdzenia, na początku wszystkie możliwe. Z każdą iteracją zmniejszane | |
o połowę. | |
:param node_index: aktualny węzeł | |
:param depth: | |
:return: drzewo w formie macierzy | |
""" | |
if self.tree_ is None: | |
self.tree_ = np.zeros((1, 7)) | |
self.tree_[0, 0] = -1.0 | |
y_distr = self.y_distribution(y, indexes) | |
imp = self.impurity_(y_distr) | |
self.tree_[node_index, DecisionTree.COL_DEPTH] = depth # todo sprawdzić | |
# skojarz z węzłem najbardziej prawdopodobną w nim klasę | |
self.tree_[node_index, DecisionTree.COL_Y] = self.class_labels_[np.argmax(y_distr)] | |
if imp == 0.0 or ((self.max_depth_ is not None) and (depth == self.max_depth_)) or \ | |
(indexes.size < self.min_node_examples_ * X.shape[0]): | |
return self.tree_ | |
# znajdź najlepsze rozcięcie | |
# k - numer zmiennej | |
# v - możiwe wartości | |
k, v, expect = self.best_split(X, y, indexes) | |
if expect >= imp: | |
return self.tree_ | |
self.tree_[node_index, DecisionTree.COL_SPLIT_FEATURE] = k | |
self.tree_[node_index, DecisionTree.COL_SPLIT_VALUE] = v | |
nodes_so_far = self.tree_.shape[0] | |
self.tree_[node_index, DecisionTree.COL_CHILD_LEFT] = nodes_so_far | |
self.tree_[node_index, DecisionTree.COL_CHILD_RIGHT] = nodes_so_far + 1 | |
self.tree_ = np.r_[self.tree_, np.zeros((2, 7))] | |
self.tree_[nodes_so_far, DecisionTree.COL_PARENT] = node_index | |
self.tree_[nodes_so_far + 1, DecisionTree.COL_PARENT] = node_index | |
X_indexes_k = X[indexes, k] | |
indexes_left = indexes[np.where(X_indexes_k < v)[0]] | |
indexes_right = indexes[np.where(X_indexes_k >= v)[0]] | |
self.grow_tree(X, y, indexes_left, nodes_so_far, depth) | |
self.grow_tree(X, y, indexes_right, nodes_so_far + 1, depth + 1) | |
return self.tree_ | |
def fit(self, X, y): | |
""" | |
Uczymy model na danych X i labelkach y | |
""" | |
self.class_labels_ = np.unique(y) | |
self.grow_tree(X, y, np.arange(X.shape[0]), 0, 0) | |
if self.pruning_: | |
scores = self.exhaustive_subtrees(X, y) | |
print("Scores:", scores) | |
return self | |
def exhaustive_subtrees(self, X, y): | |
tree_prime = np.copy(self.tree_) | |
tree_prime[0, DecisionTree.COL_CHILD_LEFT] = 0 | |
tree_prime[0, DecisionTree.COL_CHILD_RIGHT] = 0 | |
scores = {} | |
return self.do_exhaustive_subtrees(X, y, tree_prime, [0], scores) | |
def do_exhaustive_subtrees(self, X, y, tree_prime, indexes_prime, scores): | |
""" | |
:param X: | |
:param y: | |
:param tree_prime: aktualne drzewko | |
:param indexes_prime: indeksy w tprime ktore biora udzial | |
:param scores: | |
:return: | |
""" | |
# ocena drzewka tree_prime | |
err = 1 - np.sum(self.predict(X, tree=tree_prime) == y) / float(X.shape[0]) | |
leaves = np.sum(tree_prime[indexes_prime, DecisionTree.COL_CHILD_LEFT] == 0) | |
scores[str(indexes_prime)] = err + leaves * self.penalty_ | |
return scores | |
def predict(self, X, tree=None): | |
""" | |
Przewidujemy labelki dla danych X | |
""" | |
predictions = np.zeros(X.shape[0]) | |
for i in range(X.shape[0]): | |
predictions[i] = self.predict_x(X[i], tree) | |
return predictions | |
def predict_x(self, x, tree=None): | |
tree = self.tree_ if tree is None else tree | |
node_index = 0 | |
while True: | |
if tree[node_index, DecisionTree.COL_CHILD_LEFT] == 0.0: | |
return tree[node_index, DecisionTree.COL_Y] | |
k, v = int(tree[node_index, DecisionTree.COL_SPLIT_FEATURE]), tree[node_index, | |
DecisionTree.COL_SPLIT_VALUE] | |
if x[k] < v: | |
node_index = int(tree[node_index, DecisionTree.COL_CHILD_LEFT]) | |
else: | |
node_index = int(tree[node_index, DecisionTree.COL_CHILD_RIGHT]) | |
def y_distribution(self, y, indexes): | |
""" | |
Prawdopodobienstwa warunkowe klasy y pod warunkiem że jesteśmy w węzłach o indeksach. P(y|t) | |
""" | |
distr = np.zeros(self.class_labels_.size) | |
y_indexes = y[indexes] | |
for i, label in enumerate(self.class_labels_): | |
distr[i] = np.where(y_indexes == label)[0].size / float(indexes.size) | |
return distr | |
def impurity_error(self, y_distr): | |
""" | |
Błąd klasyfikacji | |
:param y_distr: rozklad prawdopodobienstwa nad klasami P(y|t) | |
""" | |
return 1.0 - np.max(y_distr) | |
def impurity_entropy(self, y_distr): | |
""" | |
Entropia - miara nieuporządkowania | |
:param y_distr: rozklad prawdopodobienstwa nad klasami P(y|t) | |
""" | |
y_distr = y_distr[y_distr > 0.0] | |
return -np.sum(y_distr * np.log2(y_distr)) | |
def impurity_gini(self, y_distr): | |
""" | |
Indeks giniego | |
:param y_distr: rozklad prawdopodobienstwa nad klasami P(y|t) | |
""" | |
return 1.0 - np.sum(y_distr**2) | |
if __name__ == '__main__': | |
y_distr = np.array([0.5, 0.0, 0.25, 0.25]) | |
dt = DecisionTree() | |
print(dt.impurity_error(y_distr)) | |
print(dt.impurity_entropy(y_distr)) | |
print(dt.impurity_gini(y_distr)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
import os | |
def pickle_all(some_list, fname): | |
os.makedirs(os.path.dirname(fname), exist_ok=True) | |
f = open(fname, 'wb') | |
pickle.dump(some_list, f) | |
f.close() | |
def unpickle_all(fname): | |
f = open(fname, 'rb') | |
some_list = pickle.load(f) | |
f.close() | |
return some_list |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cycler==0.10.0 | |
kiwisolver==1.0.1 | |
matplotlib==3.0.3 | |
numpy==1.16.2 | |
pyparsing==2.3.1 | |
python-dateutil==2.8.0 | |
scikit-learn==0.20.3 | |
scipy==1.2.1 | |
six==1.12.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment