-
-
Save vineetvermait/dd74a612101b6cbb8df105179a37392d to your computer and use it in GitHub Desktop.
Neural Network
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
class Activation: | |
def __init__(self): | |
pass | |
@staticmethod | |
def activate(self, z: np.array): | |
pass | |
@staticmethod | |
def derivative(self, z: np.array): | |
pass | |
class Linear(Activation): | |
def __init__(self, factor: int = 1): | |
self.factor = factor | |
def activate(self, z: np.array): | |
z = np.array(z) | |
return self.factor * z | |
def derivative(self, z: np.array): | |
return np.full(np.shape(z), self.factor) | |
class Sigmoid(Activation): | |
def activate(self, z: np.array): | |
z = np.array(z) | |
return 1.0 / (1.0 + np.exp(-z)) | |
def derivative(self, z: np.array): | |
return self.activate(z) * (1 - self.activate(z)) | |
class BinaryStep(Activation): | |
def activate(self, z: np.array): | |
z = np.array(z) | |
return np.where(z >= 0, 1, 0) | |
def derivative(self, z: np.array): | |
z = np.array(z) | |
return np.where(z != 0, 0, 1) | |
class TanH(Activation): | |
def activate(self, z: np.array): | |
z = np.array(z) | |
return (2.0 / (1.0 + np.exp(-2*z))) - 1 | |
def derivative(self, z: np.array): | |
return 1-self.activate((z))**2 | |
class ArcTan(Activation): | |
def activate(self, z: np.array): | |
z = np.array(z) | |
return np.arctan(z) | |
def derivative(self, z: np.array): | |
return np.cos(z)**2 | |
class ReLU(Activation): | |
def __init__(self, threshold=0): | |
self.threshold = threshold | |
def activate(self, z: np.array): | |
z = np.array(z) | |
return np.where(z >= self.threshold, z, 0) | |
def derivative(self, z: np.array): | |
return np.full(z.shape, 1) | |
class LeakyReLU(Activation): | |
def __init__(self, threshold=0, factor=20): | |
self.threshold = threshold | |
self.factor = factor | |
def activate(self, z: np.array): | |
z = np.array(z) | |
return np.where(z >= self.threshold, z, z/self.factor) | |
def derivative(self, z: np.array): | |
return np.where(z >= self.threshold, 1, 1/self.factor) | |
class ExpoReLU(Activation): | |
def __init__(self, threshold=0, factor=20): | |
self.threshold = threshold | |
self.factor = factor | |
def activate(self, z: np.array): | |
z = np.array(z) | |
return np.where(z >= self.threshold, z, (np.exp(z)-1)/self.factor) | |
def derivative(self, z: np.array): | |
return np.where(z >= self.threshold, 1, self.activate(z) + (1/self.factor)) | |
class SoftPlus(Activation): | |
def activate(self, z: np.array): | |
z = np.array(z) | |
return np.log(1+np.exp(z)) | |
def derivative(self, z: np.array): | |
return 1/(1+np.exp(-z)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import copy | |
import random | |
import activation | |
import logging | |
from mpl_toolkits.mplot3d import Axes3D | |
from matplotlib import pyplot as plt | |
debug = True | |
a, g, phi, c, t = -.01, -.005, 0, .005, .01 | |
strands = [a, g, phi, c, t] | |
codon_combinations = [] | |
for i in strands: | |
for j in strands: | |
for k in strands: | |
codon_combinations += [(i, j, k)] | |
codons = [sum(i) for i in codon_combinations] | |
def save_plots(model, _x, _y, name, title=""): | |
o = model.evaluate(_x) | |
fig = plt.figure() | |
ax = plt.axes(projection="3d") | |
tmp_x, tmp_y, _z = [], [], np.array(_y).T[0] | |
for _ix in _x: | |
tmp_x += [_ix[0]] | |
tmp_y += [_ix[1]] | |
ax.scatter3D(tmp_x, tmp_y, _z, c='gray') | |
ax.scatter3D(tmp_x, tmp_y, o.T[0], c='red') | |
plt.title(title) | |
plt.savefig(name) | |
plt.close() | |
def get_random_codon(): | |
codon = random.sample(codons, 1)[0] | |
return codon | |
def generate_wts_dna(nn_structure): | |
total_connections = 0 | |
previous_connections = nn_structure[0] | |
for i in range(len(nn_structure)): | |
total_connections += previous_connections * nn_structure[i] | |
previous_connections = nn_structure[i] | |
dna = [get_random_codon() for i in range(total_connections)] | |
root_strands = np.ndarray.flatten(np.identity(nn_structure[0])).tolist() | |
dna = root_strands+dna[len(root_strands):] | |
logging.debug("DNA Strand Length: %d", len(dna)) | |
return dna | |
def calculate_loss(expected, out): | |
loss = (np.sum([(e-o)**2 for e, o in zip(expected, out)]))/(2) | |
logging.debug("Calculated loss: %f", loss) | |
return loss | |
def loss_derivative(expected, out): | |
loss = ([o-e for e, o in zip(expected, out)]) | |
logging.debug("Calculated loss derviative: %s", loss) | |
return loss | |
def visualize_neural_network(_ntw, _wt_mtrix, _biases, _activations, v_factor=20, h_factor=20, path_prefix="./"): | |
max_height = max(_ntw) | |
neuron_coords = [] | |
plt.figure(figsize=(v_factor, h_factor)) | |
_nc = [] | |
_ac = [] | |
_bc = [] | |
for k, n in enumerate(_ntw): | |
offset = max_height/(2*n) | |
neuron_coords += [[((k*2 + 3), (i*(max_height/n)+offset)) | |
for i in range(n)]] | |
_nc += [((k*2 + 3), (i*(max_height/n)+offset)) for i in range(n)] | |
_ac += [((k*2 + 3), max_height)] | |
_bc += [((k*2 + 3), -.5)] | |
plt.scatter(*zip(*_nc), s=250, c='g') | |
plt.scatter(*zip(*_bc), s=300, facecolors='none', edgecolors='r') | |
for l, coords in enumerate(neuron_coords[:-1]): | |
for k, c in enumerate(coords): | |
for j, nc in enumerate(neuron_coords[l+1]): | |
curr_wt = _wt_mtrix[l+1][j][k] | |
if curr_wt: | |
plt.plot(*zip(*[c, nc]), '-.g') | |
plt.text(c[0]+(nc[0]-c[0])/2, c[1] + | |
(nc[1]-c[1])/2, str(np.round(curr_wt, 3))) | |
for l, act in enumerate(_activations[:len(_ntw)]): | |
plt.text(_ac[l][0]-.25, _ac[l][1], type(act).__name__) | |
for l, bias in zip(_bc, _biases): | |
plt.text(l[0], l[1], bias) | |
plt.axis("off") | |
plt.savefig(path_prefix+('-'.join(map(str, _ntw)))+".png") | |
plt.close() | |
return | |
class CondensedNN(): | |
def __init__(self, dna, ntw, activations, layer_biases=None): | |
self.dna = dna | |
self.ntw = ntw | |
self.activations = activations | |
self.wt_matrix = self._decode_dna(self.dna, self.ntw) | |
self.min_loss = float("inf") | |
self.prev_delta = float("inf") | |
self.IL = [None for _ in self.ntw] | |
self.ZL = [None for _ in self.ntw] | |
self.AL = [None for _ in self.ntw] | |
if layer_biases is None: | |
self.layer_biases = [0 for i in self.ntw] | |
else: | |
self.layer_biases = layer_biases | |
return | |
def _wts2dna(self): | |
f_w = [] | |
for w in self.wt_matrix: | |
f_w += np.array(w).flatten().tolist() | |
self.dna = f_w | |
def reset(self): | |
logging.debug("Resetting min loss from %f", self.min_loss) | |
self.min_loss = float("inf") | |
self.prev_delta = float("inf") | |
return | |
def evaluate(self, inputs): | |
_x = np.array(inputs).transpose() | |
prev_input = _x | |
i = 0 | |
for lyr, act, bias in zip(self.wt_matrix, self.activations, self.layer_biases): | |
_bias = np.transpose(np.array(bias)[np.newaxis]) | |
logging.debug("layer(%d): %s,\nbias: %s,\nactivation: %s,\nprev_input(il): %s", | |
i, lyr, _bias.tolist(), act, prev_input) | |
self.IL[i] = prev_input | |
zl = np.matmul(lyr, prev_input) + _bias | |
self.ZL[i] = zl | |
self.AL[i] = act.activate(prev_input) | |
logging.debug("layer(%d) weighted inputs(zl):\n%s", i, zl) | |
prev_input = act.activate(zl) | |
logging.debug( | |
"layer(%d) current activated output(al):\n%s", i, prev_input) | |
i += 1 | |
return prev_input | |
def _decode_dna(self, _wts, _structure): | |
_wt_matrix = [] | |
prev_input_size = _structure[0] | |
_tmp_wts = copy.deepcopy(_wts) | |
for i in _structure: | |
_tmp_mtrx = [] | |
_current_frame = _tmp_wts[:i*prev_input_size] | |
for _ in range(i): | |
_tmp_mtrx += [_current_frame[:prev_input_size]] | |
_current_frame = _current_frame[prev_input_size:] | |
_tmp_wts = _tmp_wts[i*prev_input_size:] | |
_wt_matrix += [_tmp_mtrx] | |
prev_input_size = i | |
logging.debug("Shape of wt_matrix: %s", _wt_matrix) | |
return _wt_matrix | |
def update_wts(self, new_wts, biases=None): | |
mask = self._get_ntw_mask() | |
for i, (m, w) in enumerate(zip(mask, new_wts)): | |
final_new_wts = np.multiply(m, w) | |
self.wt_matrix[i] = final_new_wts | |
self.layer_biases = biases | |
self._wts2dna() | |
def _get_ntw_mask(self): | |
k = [] | |
for i in self.wt_matrix: | |
k += [(np.array(i) != 0).astype(np.int)] | |
return k | |
def train(self, x_training_data, y_training_data, batch_size=-1, epochs=100, learning_rate=.01, no_change_threshold=.05): | |
losses = [] | |
dlosses = [] | |
dl_o_dl_losses = [] | |
best_wt_matrix = self.wt_matrix | |
best_layer_biases = self.layer_biases | |
no_change_counter = 10 | |
if batch_size < 0: | |
batch_size = len(x_training_data) | |
pages = int(len(x_training_data)/batch_size) + 1 | |
prev_loss = calculate_loss( | |
y_training_data, self.evaluate(x_training_data).T) | |
for i in range(epochs): | |
for page in range(pages): | |
current_batch_size = min(batch_size, len( | |
x_training_data)-page*batch_size) | |
logging.debug( | |
"Training batch (Current batch size: %d) %d/%d", current_batch_size, page, pages) | |
if not current_batch_size: | |
continue | |
_batch_x = x_training_data[page * | |
batch_size:page*batch_size+batch_size] | |
_batch_y = y_training_data[page * | |
batch_size:page*batch_size+batch_size] | |
logging.debug("Current Batch Data[%d:%d]:\nx: %s\ny: %s", page * | |
batch_size, page*batch_size+batch_size, | |
_batch_x, _batch_y) | |
loss, nloss = self.feedback(x_training_data=_batch_x, | |
y_training_data=_batch_y, learning_rate=learning_rate) | |
logging.debug("loss %s, nloss %s", | |
loss, nloss) | |
self.prev_delta = np.abs(nloss-loss) | |
losses += [loss] | |
dlosses += [nloss-loss] | |
dl_o_dl = loss-prev_loss | |
dl_o_dl_losses += [dl_o_dl] | |
prev_loss = loss | |
if self.min_loss > nloss: | |
if debug: | |
save_plots(self, _batch_x, _batch_y, | |
"debug/model_epoch_%s.png" % (str(i).zfill(len(str(epochs)))), title="Iteration %d" % i) | |
best_wt_matrix = copy.deepcopy(self.wt_matrix) | |
best_layer_biases = copy.deepcopy(self.layer_biases) | |
logging.debug( | |
"New minima found: %f at iteration #%d", nloss, i) | |
self.min_loss = nloss | |
logging.info("%d\t%d\t%f\t%f\t%s", i, page, np.abs(dl_o_dl), no_change_threshold, np.abs(dl_o_dl) < no_change_threshold) | |
if np.abs(dl_o_dl) < no_change_threshold: | |
no_change_counter -= 1 | |
else: | |
no_change_counter = 10 | |
if no_change_counter <= 0: | |
logging.info("Trained at %s iterations", i) | |
# print() | |
break | |
logging.debug("Epoch: %d/%d losses: %f", | |
i, epochs, np.mean(losses)) | |
self.update_wts(best_wt_matrix, best_layer_biases) | |
print(".", end="") | |
if (i+1) % 100 == 0: | |
print("[epoch %d/%d Loss: %f dl_o_dl_losses: %f]" % | |
(i, epochs, np.mean(losses[-10:]), np.mean(dl_o_dl_losses[-5:]))) | |
# if no_change_counter <= 0: | |
# break | |
self.IL = [None for _ in self.ntw] | |
self.ZL = [None for _ in self.ntw] | |
self.AL = [None for _ in self.ntw] | |
return losses, dlosses, dl_o_dl_losses | |
def feedback(self, x_training_data, y_training_data, learning_rate=.001): | |
Gl = [0 for _ in self.ntw] | |
Sl = [0 for _ in self.ntw] | |
Dl = [0 for _ in self.ntw] | |
DWl = [0 for _ in self.ntw] | |
m = len(x_training_data) | |
il = 0 | |
l = -1-il | |
pred = (self.evaluate(x_training_data)) | |
loss = calculate_loss(np.transpose(y_training_data), pred) | |
Wl = self.wt_matrix | |
Bl = self.layer_biases | |
NWl = copy.deepcopy(Wl) | |
NBl = copy.deepcopy(Bl) | |
WlT = [np.transpose(w) for w in self.wt_matrix] | |
Il = self.IL | |
Zl = self.ZL | |
Al = self.AL | |
logging.debug("Il: %s\nZl: %s\nAl: %s", Il, Zl, Al) | |
AlT = [np.transpose(l) for l in Al] | |
Gl[l] = loss_derivative(np.transpose(y_training_data), pred) | |
Sl[l] = self.activations[l].derivative(Zl[l]) | |
Dl[l] = np.multiply(Gl[l], Sl[l]) | |
DWl[l] = np.matmul(Dl[l], AlT[l]) | |
logging.debug("Al[%d]: %s\nAlT[%d]: %s\nGl[%d]: %s\nSl[%d]: %s\nDl[%d]: %s\nDWl[%d]: %s", | |
l, Al[l], l, AlT[l], l, Gl[l], l, Sl[l], l, Dl[l], l, DWl[l]) | |
for il in range(1, len(self.ntw)): | |
l = -1-il | |
Sl[l] = self.activations[l].derivative(Zl[l]) | |
Gl[l] = np.matmul(WlT[l+1], Dl[l+1]) | |
Dl[l] = Gl[l] * Sl[l] | |
DWl[l] = np.matmul(Dl[l], AlT[l]) | |
logging.debug("Al[%d]: %s\nAlT[%d]: %s\nGl[%d]: %s\nSl[%d]: %s\nDl[%d]: %s\nDWl[%d]: %s", | |
l, Al[l], l, AlT[l], l, Gl[l], l, Sl[l], l, Dl[l], l, DWl[l]) | |
logging.debug("DWl %s", DWl) | |
for i in range(len(NWl)): | |
NWl[i] -= ((learning_rate/m) * DWl[i]) | |
# NBl[i] -= np.sum((learning_rate/m) * Dl[i], axis=1) | |
NBl[i] -= np.sum((learning_rate/m) * Dl[i]) | |
logging.debug("NWl: %s,\nNBl: %s", NWl, NBl) | |
self.update_wts(NWl, NBl) | |
pred = self.evaluate(x_training_data) | |
nloss = calculate_loss(np.transpose(y_training_data), pred) | |
logging.debug("loss: %d,\nnloss: %d", loss, nloss) | |
return loss, nloss # , delta_over_delta |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def generate_sample_data(n=100, x_variance=100, x_offset=100, y_variance=100, y_offset=100): | |
_x = [[random.random(), random.random()] | |
for i in range(-n, n)] | |
offset = 1.5 | |
_expected = [[(_xi**3 + _yi + offset)] for _xi, _yi in _x] | |
return _x, _expected | |
_ntw = [2, 15, 1] | |
dna = generate_wts_dna(_ntw) | |
logging.info("DNA: %s", dna) | |
activations = [ | |
activation.Linear(), | |
activation.Linear(), | |
activation.Linear(), | |
] | |
layer_biases = [np.ones(i) for i in _ntw] | |
model = CondensedNN(dna, _ntw, activations, layer_biases=layer_biases) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment