Skip to content

Instantly share code, notes, and snippets.

@vineetvermait vineetvermait/CondensedNN.py Secret
Created Aug 4, 2019

Embed
What would you like to do?
Neural Network
import numpy as np
class Activation:
def __init__(self):
pass
@staticmethod
def activate(self, z: np.array):
pass
@staticmethod
def derivative(self, z: np.array):
pass
class Linear(Activation):
def __init__(self, factor: int = 1):
self.factor = factor
def activate(self, z: np.array):
z = np.array(z)
return self.factor * z
def derivative(self, z: np.array):
return np.full(np.shape(z), self.factor)
class Sigmoid(Activation):
def activate(self, z: np.array):
z = np.array(z)
return 1.0 / (1.0 + np.exp(-z))
def derivative(self, z: np.array):
return self.activate(z) * (1 - self.activate(z))
class BinaryStep(Activation):
def activate(self, z: np.array):
z = np.array(z)
return np.where(z >= 0, 1, 0)
def derivative(self, z: np.array):
z = np.array(z)
return np.where(z != 0, 0, 1)
class TanH(Activation):
def activate(self, z: np.array):
z = np.array(z)
return (2.0 / (1.0 + np.exp(-2*z))) - 1
def derivative(self, z: np.array):
return 1-self.activate((z))**2
class ArcTan(Activation):
def activate(self, z: np.array):
z = np.array(z)
return np.arctan(z)
def derivative(self, z: np.array):
return np.cos(z)**2
class ReLU(Activation):
def __init__(self, threshold=0):
self.threshold = threshold
def activate(self, z: np.array):
z = np.array(z)
return np.where(z >= self.threshold, z, 0)
def derivative(self, z: np.array):
return np.full(z.shape, 1)
class LeakyReLU(Activation):
def __init__(self, threshold=0, factor=20):
self.threshold = threshold
self.factor = factor
def activate(self, z: np.array):
z = np.array(z)
return np.where(z >= self.threshold, z, z/self.factor)
def derivative(self, z: np.array):
return np.where(z >= self.threshold, 1, 1/self.factor)
class ExpoReLU(Activation):
def __init__(self, threshold=0, factor=20):
self.threshold = threshold
self.factor = factor
def activate(self, z: np.array):
z = np.array(z)
return np.where(z >= self.threshold, z, (np.exp(z)-1)/self.factor)
def derivative(self, z: np.array):
return np.where(z >= self.threshold, 1, self.activate(z) + (1/self.factor))
class SoftPlus(Activation):
def activate(self, z: np.array):
z = np.array(z)
return np.log(1+np.exp(z))
def derivative(self, z: np.array):
return 1/(1+np.exp(-z))
import numpy as np
import copy
import random
import activation
import logging
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import pyplot as plt
debug = True
a, g, phi, c, t = -.01, -.005, 0, .005, .01
strands = [a, g, phi, c, t]
codon_combinations = []
for i in strands:
for j in strands:
for k in strands:
codon_combinations += [(i, j, k)]
codons = [sum(i) for i in codon_combinations]
def save_plots(model, _x, _y, name, title=""):
o = model.evaluate(_x)
fig = plt.figure()
ax = plt.axes(projection="3d")
tmp_x, tmp_y, _z = [], [], np.array(_y).T[0]
for _ix in _x:
tmp_x += [_ix[0]]
tmp_y += [_ix[1]]
ax.scatter3D(tmp_x, tmp_y, _z, c='gray')
ax.scatter3D(tmp_x, tmp_y, o.T[0], c='red')
plt.title(title)
plt.savefig(name)
plt.close()
def get_random_codon():
codon = random.sample(codons, 1)[0]
return codon
def generate_wts_dna(nn_structure):
total_connections = 0
previous_connections = nn_structure[0]
for i in range(len(nn_structure)):
total_connections += previous_connections * nn_structure[i]
previous_connections = nn_structure[i]
dna = [get_random_codon() for i in range(total_connections)]
root_strands = np.ndarray.flatten(np.identity(nn_structure[0])).tolist()
dna = root_strands+dna[len(root_strands):]
logging.debug("DNA Strand Length: %d", len(dna))
return dna
def calculate_loss(expected, out):
loss = (np.sum([(e-o)**2 for e, o in zip(expected, out)]))/(2)
logging.debug("Calculated loss: %f", loss)
return loss
def loss_derivative(expected, out):
loss = ([o-e for e, o in zip(expected, out)])
logging.debug("Calculated loss derviative: %s", loss)
return loss
def visualize_neural_network(_ntw, _wt_mtrix, _biases, _activations, v_factor=20, h_factor=20, path_prefix="./"):
max_height = max(_ntw)
neuron_coords = []
plt.figure(figsize=(v_factor, h_factor))
_nc = []
_ac = []
_bc = []
for k, n in enumerate(_ntw):
offset = max_height/(2*n)
neuron_coords += [[((k*2 + 3), (i*(max_height/n)+offset))
for i in range(n)]]
_nc += [((k*2 + 3), (i*(max_height/n)+offset)) for i in range(n)]
_ac += [((k*2 + 3), max_height)]
_bc += [((k*2 + 3), -.5)]
plt.scatter(*zip(*_nc), s=250, c='g')
plt.scatter(*zip(*_bc), s=300, facecolors='none', edgecolors='r')
for l, coords in enumerate(neuron_coords[:-1]):
for k, c in enumerate(coords):
for j, nc in enumerate(neuron_coords[l+1]):
curr_wt = _wt_mtrix[l+1][j][k]
if curr_wt:
plt.plot(*zip(*[c, nc]), '-.g')
plt.text(c[0]+(nc[0]-c[0])/2, c[1] +
(nc[1]-c[1])/2, str(np.round(curr_wt, 3)))
for l, act in enumerate(_activations[:len(_ntw)]):
plt.text(_ac[l][0]-.25, _ac[l][1], type(act).__name__)
for l, bias in zip(_bc, _biases):
plt.text(l[0], l[1], bias)
plt.axis("off")
plt.savefig(path_prefix+('-'.join(map(str, _ntw)))+".png")
plt.close()
return
class CondensedNN():
def __init__(self, dna, ntw, activations, layer_biases=None):
self.dna = dna
self.ntw = ntw
self.activations = activations
self.wt_matrix = self._decode_dna(self.dna, self.ntw)
self.min_loss = float("inf")
self.prev_delta = float("inf")
self.IL = [None for _ in self.ntw]
self.ZL = [None for _ in self.ntw]
self.AL = [None for _ in self.ntw]
if layer_biases is None:
self.layer_biases = [0 for i in self.ntw]
else:
self.layer_biases = layer_biases
return
def _wts2dna(self):
f_w = []
for w in self.wt_matrix:
f_w += np.array(w).flatten().tolist()
self.dna = f_w
def reset(self):
logging.debug("Resetting min loss from %f", self.min_loss)
self.min_loss = float("inf")
self.prev_delta = float("inf")
return
def evaluate(self, inputs):
_x = np.array(inputs).transpose()
prev_input = _x
i = 0
for lyr, act, bias in zip(self.wt_matrix, self.activations, self.layer_biases):
_bias = np.transpose(np.array(bias)[np.newaxis])
logging.debug("layer(%d): %s,\nbias: %s,\nactivation: %s,\nprev_input(il): %s",
i, lyr, _bias.tolist(), act, prev_input)
self.IL[i] = prev_input
zl = np.matmul(lyr, prev_input) + _bias
self.ZL[i] = zl
self.AL[i] = act.activate(prev_input)
logging.debug("layer(%d) weighted inputs(zl):\n%s", i, zl)
prev_input = act.activate(zl)
logging.debug(
"layer(%d) current activated output(al):\n%s", i, prev_input)
i += 1
return prev_input
def _decode_dna(self, _wts, _structure):
_wt_matrix = []
prev_input_size = _structure[0]
_tmp_wts = copy.deepcopy(_wts)
for i in _structure:
_tmp_mtrx = []
_current_frame = _tmp_wts[:i*prev_input_size]
for _ in range(i):
_tmp_mtrx += [_current_frame[:prev_input_size]]
_current_frame = _current_frame[prev_input_size:]
_tmp_wts = _tmp_wts[i*prev_input_size:]
_wt_matrix += [_tmp_mtrx]
prev_input_size = i
logging.debug("Shape of wt_matrix: %s", _wt_matrix)
return _wt_matrix
def update_wts(self, new_wts, biases=None):
mask = self._get_ntw_mask()
for i, (m, w) in enumerate(zip(mask, new_wts)):
final_new_wts = np.multiply(m, w)
self.wt_matrix[i] = final_new_wts
self.layer_biases = biases
self._wts2dna()
def _get_ntw_mask(self):
k = []
for i in self.wt_matrix:
k += [(np.array(i) != 0).astype(np.int)]
return k
def train(self, x_training_data, y_training_data, batch_size=-1, epochs=100, learning_rate=.01, no_change_threshold=.05):
losses = []
dlosses = []
dl_o_dl_losses = []
best_wt_matrix = self.wt_matrix
best_layer_biases = self.layer_biases
no_change_counter = 10
if batch_size < 0:
batch_size = len(x_training_data)
pages = int(len(x_training_data)/batch_size) + 1
prev_loss = calculate_loss(
y_training_data, self.evaluate(x_training_data).T)
for i in range(epochs):
for page in range(pages):
current_batch_size = min(batch_size, len(
x_training_data)-page*batch_size)
logging.debug(
"Training batch (Current batch size: %d) %d/%d", current_batch_size, page, pages)
if not current_batch_size:
continue
_batch_x = x_training_data[page *
batch_size:page*batch_size+batch_size]
_batch_y = y_training_data[page *
batch_size:page*batch_size+batch_size]
logging.debug("Current Batch Data[%d:%d]:\nx: %s\ny: %s", page *
batch_size, page*batch_size+batch_size,
_batch_x, _batch_y)
loss, nloss = self.feedback(x_training_data=_batch_x,
y_training_data=_batch_y, learning_rate=learning_rate)
logging.debug("loss %s, nloss %s",
loss, nloss)
self.prev_delta = np.abs(nloss-loss)
losses += [loss]
dlosses += [nloss-loss]
dl_o_dl = loss-prev_loss
dl_o_dl_losses += [dl_o_dl]
prev_loss = loss
if self.min_loss > nloss:
if debug:
save_plots(self, _batch_x, _batch_y,
"debug/model_epoch_%s.png" % (str(i).zfill(len(str(epochs)))), title="Iteration %d" % i)
best_wt_matrix = copy.deepcopy(self.wt_matrix)
best_layer_biases = copy.deepcopy(self.layer_biases)
logging.debug(
"New minima found: %f at iteration #%d", nloss, i)
self.min_loss = nloss
logging.info("%d\t%d\t%f\t%f\t%s", i, page, np.abs(dl_o_dl), no_change_threshold, np.abs(dl_o_dl) < no_change_threshold)
if np.abs(dl_o_dl) < no_change_threshold:
no_change_counter -= 1
else:
no_change_counter = 10
if no_change_counter <= 0:
logging.info("Trained at %s iterations", i)
# print()
break
logging.debug("Epoch: %d/%d losses: %f",
i, epochs, np.mean(losses))
self.update_wts(best_wt_matrix, best_layer_biases)
print(".", end="")
if (i+1) % 100 == 0:
print("[epoch %d/%d Loss: %f dl_o_dl_losses: %f]" %
(i, epochs, np.mean(losses[-10:]), np.mean(dl_o_dl_losses[-5:])))
# if no_change_counter <= 0:
# break
self.IL = [None for _ in self.ntw]
self.ZL = [None for _ in self.ntw]
self.AL = [None for _ in self.ntw]
return losses, dlosses, dl_o_dl_losses
def feedback(self, x_training_data, y_training_data, learning_rate=.001):
Gl = [0 for _ in self.ntw]
Sl = [0 for _ in self.ntw]
Dl = [0 for _ in self.ntw]
DWl = [0 for _ in self.ntw]
m = len(x_training_data)
il = 0
l = -1-il
pred = (self.evaluate(x_training_data))
loss = calculate_loss(np.transpose(y_training_data), pred)
Wl = self.wt_matrix
Bl = self.layer_biases
NWl = copy.deepcopy(Wl)
NBl = copy.deepcopy(Bl)
WlT = [np.transpose(w) for w in self.wt_matrix]
Il = self.IL
Zl = self.ZL
Al = self.AL
logging.debug("Il: %s\nZl: %s\nAl: %s", Il, Zl, Al)
AlT = [np.transpose(l) for l in Al]
Gl[l] = loss_derivative(np.transpose(y_training_data), pred)
Sl[l] = self.activations[l].derivative(Zl[l])
Dl[l] = np.multiply(Gl[l], Sl[l])
DWl[l] = np.matmul(Dl[l], AlT[l])
logging.debug("Al[%d]: %s\nAlT[%d]: %s\nGl[%d]: %s\nSl[%d]: %s\nDl[%d]: %s\nDWl[%d]: %s",
l, Al[l], l, AlT[l], l, Gl[l], l, Sl[l], l, Dl[l], l, DWl[l])
for il in range(1, len(self.ntw)):
l = -1-il
Sl[l] = self.activations[l].derivative(Zl[l])
Gl[l] = np.matmul(WlT[l+1], Dl[l+1])
Dl[l] = Gl[l] * Sl[l]
DWl[l] = np.matmul(Dl[l], AlT[l])
logging.debug("Al[%d]: %s\nAlT[%d]: %s\nGl[%d]: %s\nSl[%d]: %s\nDl[%d]: %s\nDWl[%d]: %s",
l, Al[l], l, AlT[l], l, Gl[l], l, Sl[l], l, Dl[l], l, DWl[l])
logging.debug("DWl %s", DWl)
for i in range(len(NWl)):
NWl[i] -= ((learning_rate/m) * DWl[i])
# NBl[i] -= np.sum((learning_rate/m) * Dl[i], axis=1)
NBl[i] -= np.sum((learning_rate/m) * Dl[i])
logging.debug("NWl: %s,\nNBl: %s", NWl, NBl)
self.update_wts(NWl, NBl)
pred = self.evaluate(x_training_data)
nloss = calculate_loss(np.transpose(y_training_data), pred)
logging.debug("loss: %d,\nnloss: %d", loss, nloss)
return loss, nloss # , delta_over_delta
def generate_sample_data(n=100, x_variance=100, x_offset=100, y_variance=100, y_offset=100):
_x = [[random.random(), random.random()]
for i in range(-n, n)]
offset = 1.5
_expected = [[(_xi**3 + _yi + offset)] for _xi, _yi in _x]
return _x, _expected
_ntw = [2, 15, 1]
dna = generate_wts_dna(_ntw)
logging.info("DNA: %s", dna)
activations = [
activation.Linear(),
activation.Linear(),
activation.Linear(),
]
layer_biases = [np.ones(i) for i in _ntw]
model = CondensedNN(dna, _ntw, activations, layer_biases=layer_biases)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.