Last active
March 28, 2019 16:14
-
-
Save karlosos/3d9db173329a0d1e72d46dda5c435e91 to your computer and use it in GitHub Desktop.
Algorytm CART
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
CART | |
""" | |
from sklearn import datasets | |
import time | |
from sklearn.model_selection import train_test_split | |
from classifiers import DecisionTree | |
import numpy as np | |
from misc import * # funkcje do picklowania | |
import matplotlib | |
matplotlib.use('TkAgg') | |
import matplotlib.pyplot as plt | |
def pca(data, components=None, variance_sum_ratio=None): | |
""" | |
Perform Principal Components Analysis | |
:param components: Zmniejszanie wymiarowości. Liczba pierwszych komponentów jakie | |
mają byc brane pod uwagę. Obcina macierz V i wektor L do tej liczby. | |
:param variance_sum_ratio: Zmniejszanie wymiarowości. Liczba w zakresie od 0 do 1, która określa | |
ile składowych ma być grane pod uwagę. Dobiera taką najmniejszą liczbę początkowych składowych głównych, | |
które objaśniają zadaną część całkowitej wariancji. | |
""" | |
cov = np.cov(data, rowvar=False) # przyklady ulozone wierszami | |
# L - wartości wlasne - lambdy - wariancje względem kierunków | |
# V - wektory własne - ułożone kolumnami | |
start_time = time.time() | |
l, v = np.linalg.eig(cov) | |
L = np.real(l) | |
V = np.real(v) | |
ordering = np.argsort(-L) | |
L = L[ordering] | |
V = V[:, ordering] | |
# Redukcja wymiarowości | |
if variance_sum_ratio is not None: | |
L, V, _ = slice_variance_sum_ratio(L, V, variance_sum_ratio) | |
elif components is not None: | |
L = L[:components] | |
V = V[:, :components] | |
elapsed_time = time.time() - start_time | |
print("time eig:", elapsed_time) | |
return L, V | |
def slice_variance_sum_ratio(L, V, variance_sum_ratio): | |
""" | |
Zmniejszanie wymiarowości. Dobiera taką najmniejszą liczbę początkowych składowych głównych, | |
które objaśniają zadaną część całkowitej wariancji. Zwraca zredukowane L, V oraz indeks i do którego | |
zostało wykonane przycięcie. | |
:return: zredukowane L, V oraz indeks do którego zostało wykonane przycięcie | |
""" | |
for i in range(len(L)): | |
if np.sum(L[:i])/np.sum(L) >= variance_sum_ratio: | |
L = L[:i] | |
V = V[:, :i] | |
return L, V, i | |
def load_pca_or_generate(X_train): | |
""" | |
Jeżeli istnieją zapiklowane dane PCA to je wczytaj. W innym przypadku wykonaj PCA na podanych danych. | |
:return: wartości własne i macierze własne | |
""" | |
file_exists = os.path.isfile('./data/olivetti_pca.pik') | |
if file_exists: | |
L, V = unpickle_all('data/olivetti_pca.pik') | |
return L, V | |
else: | |
L, V = pca(X_train, components=None, variance_sum_ratio=0.95) | |
pickle_all([L, V], 'data/olivetti_pca.pik') | |
return L, V | |
def show_some_images(images, indexes=None, as_grid=True, title=None, subtitles=None): | |
""" | |
Wyświetla wiele obrazków jednocześnie. | |
:param images: dane z obrazkami. Może to być macierz o trzech wymiarach. images[i, j, k], gdzie i to numer zdjęcia, | |
j, k - współrzędne pikselu, lub o dwóch wymiarach images[i, j], gdzie i to numer zdjęcia a j | |
:param indexes: indeksy które chcemy wyświetlać | |
:param as_grid: czy chcemy wyświetlać w siatce - True, lub w linii poziomej. | |
""" | |
if indexes is None: | |
indexes = range(len(images)) | |
shape = images[0].shape | |
if len(shape) == 1: # obrazki spłaszczone - reprezentowane w postaci wektora, przyjmujemy że obrazki są kwadratowe | |
img_side = int(np.sqrt(shape)) | |
images = images.reshape(images.shape[0], img_side, img_side) | |
fig = plt.figure() | |
plt.gray() | |
if title is not None: | |
fig.canvas.set_window_title(title) | |
grid = int(np.ceil(np.sqrt(len(indexes)))) | |
for i, index in enumerate(indexes): | |
if as_grid: | |
plt.subplot(grid, grid, i+1) | |
else: | |
plt.subplot(1, len(indexes), i+1) | |
plt.xticks([]) | |
plt.yticks([]) | |
plt.imshow(images[index]) | |
if subtitles is not None: | |
plt.title(subtitles[index]) | |
plt.show() | |
def main(): | |
np.set_printoptions(threshold=np.inf, precision=1) | |
olivetti = datasets.fetch_olivetti_faces() | |
glasses = np.genfromtxt('olivetti_glasses.txt', delimiter=',').astype(int) | |
# tworzymy wektor z labelkami, czy dane zdjęcie przedstawia okularnika | |
y_glasses = np.zeros(olivetti.data.shape[0]) | |
y_glasses = y_glasses.astype(int) | |
y_glasses[glasses] = 1 | |
# ile osób ma okulary w zbiorze danych | |
# print(np.where(y_glasses == 1)[0].size / float(olivetti.data.shape[0])) | |
# Wybraliśmy, że będziemy uczyć klasyfikator po okularach. | |
y = y_glasses | |
# y = y.target | |
show_some_images(olivetti.images, glasses, title="Okularnicy") | |
X_train, X_test, y_train, y_test = train_test_split(olivetti.data, y, test_size=0.2, | |
stratify=y, random_state=0) | |
L, V = load_pca_or_generate(X_train) | |
n = 50 | |
X_train_pca = X_train.dot(V[:, :n]) | |
X_test_pca = X_test.dot(V[:, :n]) | |
data_all = olivetti.data.dot(V[:, :n]) | |
dt = DecisionTree(impurity="impurity_entropy") | |
dt.fit(X_train_pca, y_train) | |
print(dt.tree_) | |
print(dt.tree_.shape) | |
predictions = dt.predict(X_test_pca[:10, :]) | |
print(predictions) | |
print("Wynik klasyfikacji dla zbioru uczącego:", dt.score(X_train_pca, y_train)) | |
print("Wynik klasyfikacji dla zbioru testowego:", dt.score(X_test_pca, y_test)) | |
print("Wynik klasyfikacji dla zbioru testowego (custom):", np.sum(y_test == dt.predict(X_test_pca)) / y_test.size) | |
# show_some_images(V.T, indexes=[6, 3, 7]) | |
show_some_images(X_test[:10, :], subtitles=predictions) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sklearn.base import BaseEstimator, ClassifierMixin | |
import numpy as np | |
class DecisionTree(BaseEstimator, ClassifierMixin): | |
""" | |
Drzewo decyzyjne CART. | |
""" | |
COL_PARENT = 0 | |
COL_SPLIT_FEATURE = 1 | |
COL_SPLIT_VALUE = 2 | |
COL_CHILD_LEFT = 3 | |
COL_CHILD_RIGHT = 4 | |
COL_Y = 5 | |
COL_DEPTH = 6 | |
def __init__(self, impurity='impurity_entropy'): | |
self.tree_ = None | |
self.class_labels_ = None | |
self.impurity_ = getattr(self, impurity) | |
def best_split(self, X, y, indexes): | |
""" | |
Znajdź najlepsze rozcięcie dla poddrzewa | |
:param X: dane | |
:param y: etykiety | |
:param indexes: | |
:return: | |
""" | |
n = X.shape[1] | |
best_k = None | |
best_v = None | |
best_expect = np.inf | |
for k in range(n): | |
X_indexes_k = X[indexes, k] | |
u = np.unique(X[indexes, k]) | |
vals = 0.5 * (u[:-1] + u[1:]) | |
for v in vals: | |
indexes_left = indexes[np.where(X_indexes_k < v)[0]] | |
indexes_right = indexes[np.where(X_indexes_k >= v)[0]] | |
distr_left = self.y_distribution(y, indexes_left) | |
distr_right = self.y_distribution(y, indexes_right) | |
expect = 1.0 / float(indexes.size) * (indexes_left.size * self.impurity_(distr_left) + | |
indexes_right.size * self.impurity_(distr_right)) | |
if expect < best_expect: | |
best_expect = expect | |
best_k = k | |
best_v = v | |
return best_k, best_v, best_expect | |
def grow_tree(self, X, y, indexes, node_index, depth): | |
if self.tree_ is None: | |
self.tree_ = np.zeros((1, 7)) | |
self.tree_[0, 0] = -1.0 | |
y_distr = self.y_distribution(y, indexes) | |
imp = self.impurity_(y_distr) | |
self.tree_[node_index, DecisionTree.COL_DEPTH] = depth # todo sprawdzić | |
# skojarz z węzłem najbardziej prawdopodobną w nim klasę | |
self.tree_[node_index, DecisionTree.COL_Y] = self.class_labels_[np.argmax(y_distr)] | |
if imp == 0.0: | |
return self.tree_ | |
# znajdź najlepsze rozcięcie | |
k, v, expect = self.best_split(X, y, indexes) | |
if expect >= imp: | |
return self.tree_ | |
self.tree_[node_index, DecisionTree.COL_SPLIT_FEATURE] = k | |
self.tree_[node_index, DecisionTree.COL_SPLIT_VALUE] = v | |
nodes_so_far = self.tree_.shape[0] | |
self.tree_[node_index, DecisionTree.COL_CHILD_LEFT] = nodes_so_far | |
self.tree_[node_index, DecisionTree.COL_CHILD_RIGHT] = nodes_so_far + 1 | |
self.tree_ = np.r_[self.tree_, np.zeros((2, 7))] | |
self.tree_[nodes_so_far, DecisionTree.COL_PARENT] = node_index | |
self.tree_[nodes_so_far + 1, DecisionTree.COL_PARENT] = node_index | |
X_indexes_k = X[indexes, k] | |
indexes_left = indexes[np.where(X_indexes_k < v)[0]] | |
indexes_right = indexes[np.where(X_indexes_k >= v)[0]] | |
self.grow_tree(X, y, indexes_left, nodes_so_far, depth) | |
self.grow_tree(X, y, indexes_right, nodes_so_far + 1, depth + 1) | |
return self.tree_ | |
def fit(self, X, y): | |
""" | |
Uczymy model na danych X i labelkach y | |
""" | |
self.class_labels_ = np.unique(y) | |
self.grow_tree(X, y, np.arange(X.shape[0]), 0, 0) | |
return self | |
def predict(self, X): | |
""" | |
Przewidujemy labelki dla danych X | |
""" | |
predictions = np.zeros(X.shape[0]) | |
for i in range(X.shape[0]): | |
predictions[i] = self.predict_x(X[i]) | |
return predictions | |
def predict_x(self, x): | |
node_index = 0 | |
while True: | |
if self.tree_[node_index, DecisionTree.COL_CHILD_LEFT] == 0.0: | |
return self.tree_[node_index, DecisionTree.COL_Y] | |
k, v = int(self.tree_[node_index, DecisionTree.COL_SPLIT_FEATURE]), self.tree_[node_index, DecisionTree.COL_SPLIT_VALUE] | |
if x[k] < v: | |
node_index = int(self.tree_[node_index, DecisionTree.COL_CHILD_LEFT]) | |
else: | |
node_index = int(self.tree_[node_index, DecisionTree.COL_CHILD_RIGHT]) | |
def y_distribution(self, y, indexes): | |
""" | |
Prawdopodobienstwa warunkowe klasy y pod warunkiem że jesteśmy w węzłach o indeksach. P(y|t) | |
""" | |
distr = np.zeros(self.class_labels_.size) | |
y_indexes = y[indexes] | |
for i, label in enumerate(self.class_labels_): | |
distr[i] = np.where(y_indexes == label)[0].size / float(indexes.size) | |
return distr | |
def impurity_error(self, y_distr): | |
""" | |
Błąd klasyfikacji | |
:param y_distr: rozklad prawdopodobienstwa nad klasami P(y|t) | |
""" | |
return 1.0 - np.max(y_distr) | |
def impurity_entropy(self, y_distr): | |
""" | |
Entropia - miara nieuporządkowania | |
:param y_distr: rozklad prawdopodobienstwa nad klasami P(y|t) | |
""" | |
y_distr = y_distr[y_distr > 0.0] | |
return -np.sum(y_distr * np.log2(y_distr)) | |
def impurity_gini(self, y_distr): | |
""" | |
Indeks giniego | |
:param y_distr: rozklad prawdopodobienstwa nad klasami P(y|t) | |
""" | |
return 1.0 - np.sum(y_distr**2) | |
if __name__ == '__main__': | |
y_distr = np.array([0.5, 0.0, 0.25, 0.25]) | |
dt = DecisionTree() | |
print(dt.impurity_error(y_distr)) | |
print(dt.impurity_entropy(y_distr)) | |
print(dt.impurity_gini(y_distr)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
import os | |
def pickle_all(some_list, fname): | |
os.makedirs(os.path.dirname(fname), exist_ok=True) | |
f = open(fname, 'wb') | |
pickle.dump(some_list, f) | |
f.close() | |
def unpickle_all(fname): | |
f = open(fname, 'rb') | |
some_list = pickle.load(f) | |
f.close() | |
return some_list |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment