Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@mikaelsouza
Created July 9, 2019 19:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mikaelsouza/ee54e6e21ee77795530258bad3ccec0d to your computer and use it in GitHub Desktop.
Save mikaelsouza/ee54e6e21ee77795530258bad3ccec0d to your computer and use it in GitHub Desktop.
import warnings
import math
from functools import reduce
import collections
from collections import defaultdict
import numpy as np
from keras.datasets import mnist
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import KBinsDiscretizer
from math import log2
from collections import Counter
import json
import time
from multiprocessing.pool import ThreadPool
from multiprocessing import Pool
def ginni_index_frequencies(attrs, classes):
attr_class_counter = {}
for index,_ in enumerate(attrs):
attr = attrs[index]
clazz = classes[index]
attr_class_counter[attr] = recovered_counter = attr_class_counter[attr] if attr in attr_class_counter else {}
recovered_counter[clazz] = recovered_counter[clazz] + 1 if clazz in recovered_counter else 1
return attr_class_counter
def ginni_index(attrs, classes):
class_freq_by_attr = ginni_index_frequencies(attrs, classes)
accumulated = 0
for attrValue, clazz in class_freq_by_attr.items():
clazz_apparition = reduce(lambda x,y: x+y, clazz.values())
local_ginni_index_complement = reduce(lambda x,y: x+y,map(lambda x: (x/clazz_apparition)**2, clazz.values()))
local_ginni_index = (1.0 - local_ginni_index_complement)
accumulated=accumulated + (local_ginni_index) * (clazz_apparition / len(classes))
return accumulated
def calc_entropy(probabilities):
entropy_list = list()
for probability in probabilities:
if probability == 0:
continue
entropy_list.append(-(probability * log2(probability)))
return sum(entropy_list)
def calc_probability(frequencies):
return [v / sum(frequencies) for v in frequencies]
def calc_column_classes_probability(column, classes):
c = Counter(column)
column_probability = dict()
for key in c.keys():
class_list = [
clazz for column_value, clazz in zip(column, classes)
if column_value == key
]
frequencies = Counter(class_list).values()
probability = calc_probability(frequencies)
column_probability[key] = probability
return column_probability
def calc_column_probability(column):
c = Counter(column)
d = {}
for key, value in c.items():
d[key] = value / sum(c.values())
return d
def info_gain(column, classes):
# 1. Cálculo da entropia da classe:
class_counter = Counter(classes)
class_frequency = class_counter.values()
class_probability = calc_probability(class_frequency)
class_entropy = calc_entropy(class_probability)
# 2. Cálculo da entropia das colunas:
column_classes_probability = calc_column_classes_probability(
column, classes)
column_entropy = dict()
for key, probabilities in column_classes_probability.items():
column_entropy[key] = calc_entropy(probabilities)
entropy = 0
column_probability = calc_column_probability(column)
for key in column_entropy.keys():
entropy += column_probability[key] * column_entropy[key]
return class_entropy - entropy
def mirrored_info_gain(column, classes):
return -info_gain(column, classes)
def best_attribute_index(features, classes, algorithm):
if(features.shape[0] <= 0):
raise BaseException("How can exist the best attribute of nothing?")
min_index, min_value= 0, algorithm(features[0], classes)
for index,feature in enumerate(features):
coeficient = algorithm(feature, classes)
if coeficient < min_value:
min_index, min_value = index, coeficient
return min_index
def get_most_frequent_classification(dataset):
clazzes = dataset[:,-1]
return Counter(clazzes).most_common(1)[0][0]
def split_dataset_by_column_values(column_index, dataset):
assert(type(column_index) == int)
pieces = defaultdict(list)
column = dataset[:, column_index].T
dataset = np.delete(dataset, column_index, axis=1)
if(dataset.shape[0] <= 2):
return dataset[:,-1][0], True
else:
for index, value in enumerate(column):
pieces[value].append(dataset[index])
for key, value in pieces.items():
pieces[key] = np.array(value)
return dict(pieces), False
def plant_a_tree(dataset, algorithm, headers, level=0): # o objetivo dessa função é construir uma árvore (ou subárvore com seus nós)
if dataset.shape[1] == 0: # Se o shape do cara é 1 (só tem a coluna da classe), eu dou um erro safado
print(level)
raise BaseException("Dude, c'mon!")
best_attribute_i = best_attribute_index(dataset[:,:-1].T, dataset[:,-1], algorithm) # recupero o melhor atributo
if best_attribute_i is not None: # verifico se veio none só pela zoa
trunk = {} # monto o tronco da árvore (entende-se tronco aqui como sendo um ponto de decisão)
trunk['attribute']= headers[best_attribute_i] # digo qual foi o melhor atributo pro tronco
headers = np.delete(headers, best_attribute_i, axis=0) # deleto o header referente à melhor coluna atual
chunked_dataset, forced_leaf = split_dataset_by_column_values(best_attribute_i, dataset) # corto o dataset em alguns pedaços e verifico se já não é hora de virar uma folha (se só tiverem até 2 elementos)
if forced_leaf: # se foi uma folha forçada
trunk['class'] = chunked_dataset # falo que o valor dela é exatamente o retorno do chubked_dataset, que é a classificação
del trunk['attribute'] # deleto esse atributo por ser inútil aqui
else:
trunk['values'] = {}
for key, cur_dataset in chunked_dataset.items(): # se tiverem chunks efetivos (que não viraram folhas na moral por ter poucos elementos)
if (cur_dataset.shape[1] > 1): # verifico se tem muitas colunas para chamar recursivamente outro tronco (se tiver mais que a coluna de classes)
trunk['values'][key] = plant_a_tree(cur_dataset, algorithm, headers, level=level+1)
else: # senão eu digo que a primeira classe do primeiro elemento do dataset é a classe correspondente ao carinha, será que isso está certo? você consegue perceber isso?
trunk['values'][key] = {'class': cur_dataset[:,-1][0]}
return trunk
else:
raise BaseException("How can the best attribute of a dataset be something if there are only the labels?")
return None
from random import uniform, randint
def sample_rows(database_data, min_instances=0.3, max_instances=0.8):
database_size = database_data.shape[0]
instances_percentage = uniform(min_instances, max_instances)
instances_quantity = int(instances_percentage * database_size)
bootstraped_database = []
for _ in range(instances_quantity):
index = randint(0, database_size - 1)
bootstraped_database.append(database_data[index])
bootstraped_database = np.array(bootstraped_database)
return bootstraped_database
def sample_cols(database_data, database_headers, min_instances=0.3, max_instances=0.3):
headers_size = database_headers.shape[0]
headers_percentage = uniform(min_instances, max_instances)
headers_quantity = int(headers_percentage * headers_size)
if headers_quantity < 1:
headers_quantity = 1
bootstraped_headers = []
bootstraped_data = []
indexes = []
for _ in range(headers_quantity):
index = randint(0, headers_size - 2)
indexes.append(index)
bootstraped_headers.append(database_headers[index])
bootstraped_data.append(database_data[:,index])
bootstraped_headers.append(database_headers[-1])
bootstraped_data.append(database_data[:,-1])
bootstraped_headers = np.array(bootstraped_headers).T
bootstraped_data = np.array(bootstraped_data).T
return bootstraped_data, bootstraped_headers, indexes
class SampleTree:
def __init__(self, database_data, database_headers, algorithm=ginni_index, row_transform=None, col_transform=None): # é assumido que os atributos aqui sejam np.array
self.indexes = None
if col_transform is not None: # esses caras amostram colunas
database_data, database_headers, self.indexes = col_transform(database_data, database_headers)
if row_transform is not None: # esses caras amostram linhas
database_data = row_transform(database_data)
self.database_data = database_data
self.database_headers = database_headers
self.algorithm = algorithm
self.tree = plant_a_tree(database_data, algorithm, database_headers)
def infeer(self, sample): # é assumido que o sample seja nparray
if self.indexes is not None:
sample = sample[self.indexes]
if(sample.shape[0] == self.database_headers.shape[0]-1):
cur_node = self.tree
while True:
if 'class' in cur_node:
return cur_node['class']
elif 'attribute' in cur_node and 'values' in cur_node:
index = np.where(self.database_headers == cur_node['attribute'])[0][0] # encontrar o indice do atributo no header pra pegar no sample também
if sample[index] in cur_node['values']:
cur_node = cur_node['values'][sample[index]]
else:
return None
#raise BaseException('This sample is messed up in attribute ', sample[index])
else:
raise BaseException('This tree is messed up!')
else:
raise BaseException("Cannot infeer this sample, has too few or too many arguments")
class RandomForest:
def __init__(self, database_data, database_headers, algorithm=ginni_index, row_transform=None, col_transform=None, n_trees=100):
self.trees = []
for _ in range(n_trees):
self.trees.append(SampleTree(database_data, database_headers, algorithm, row_transform, col_transform))
def infeer(self, sample):
return Counter(map(lambda tree: tree.infeer(sample), self.trees)).most_common(1)[0][0]
def infeer_lc(self, sample):
return Counter([tree.infeer(x) for x in self.trees]).most_common(1)[0][0]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment