Skip to content

Instantly share code, notes, and snippets.

@yasyf
Last active November 7, 2016 04:33
Show Gist options
  • Save yasyf/16241e2f176ca97becf237fed7b75dda to your computer and use it in GitHub Desktop.
Save yasyf/16241e2f176ca97becf237fed7b75dda to your computer and use it in GitHub Desktop.
Fully-Connected NN with ReLU Hidden Units and Softmax Output
import numpy as np
import functools, warnings
class EpochLog(object):
IMPROVEMENT_WINDOW = 5
def __init__(self, threshold, maxepoch):
self.threshold = threshold
self.maxepoch = maxepoch
self.reset()
def reset(self):
self.epochs = []
@property
def nepochs(self):
return len(self.epochs)
@property
def improvement(self):
if self.nepochs < self.IMPROVEMENT_WINDOW:
return np.inf
return self.epochs[-self.IMPROVEMENT_WINDOW] - self.epochs[-1]
def _add(self, loss):
self.epochs.append(loss)
def add(self, loss):
self._add(loss)
if self.nepochs >= self.maxepoch or self.improvement < self.threshold:
raise StopIteration
def ignoreerrors(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
with np.errstate(all='ignore'):
return f(*args, **kwargs)
return wrapper
class FullyConnectedNN(object):
STEPSIZE_STEP = 0.1
def __init__(self, layers, stepsize=1e-3, threshold=1, maxepoch=1e2):
self.stepsize = stepsize
self.threshold = threshold
self.maxepoch = maxepoch
self.layers = layers
assert self.nlayers > 1, "must have at least input and output layers"
self._init_net()
def _init_net(self):
self.deltas = [None] + ([0] * (self.nlayers - 1))
self.activations = [None] + map(np.zeros, self.layers[1:])
self.weights = self._init_weights()
self.biases = [None] + map(np.zeros, self.layers[1:])
def _init_weights(self):
weights = [None]
for i in xrange(1, self.nlayers):
m = self.layers[i-1]
n = self.layers[i]
weights.append(np.random.normal(0, 1. / np.sqrt(m), (m, n)))
return weights
@property
def nlayers(self):
return len(self.layers)
@staticmethod
def relu_activation(z):
return np.maximum(0, z)
@staticmethod
def softmax_activation(z):
exponentiated = np.exp(z - z.max())
return exponentiated / exponentiated.sum()
@staticmethod
def relu_derivative(a):
return (a > 0).astype(np.float)
@staticmethod
def softmax_derivative(a):
return a * (1 - a)
def activation_fn(self, z, i):
if i == 0 or i >= self.nlayers:
raise IndexError
elif i == (self.nlayers - 1):
return self.softmax_activation(z)
else:
return self.relu_activation(z)
@ignoreerrors
def _calc_loss(self, y):
return np.nan_to_num(-(y * np.log(self.activations[-1]))).sum()
def _loss(self, x, y):
self.feedforward(x)
return self._calc_loss(y)
@staticmethod
@ignoreerrors
def loss_gradient(a, y):
return np.nan_to_num(-np.divide(y, a))
def feedforward(self, x):
self.activations[0] = x
for i in xrange(1, self.nlayers):
z = np.dot(self.weights[i].T, self.activations[i-1]) + self.biases[i]
self.activations[i] = self.activation_fn(z, i)
def backprop(self, y):
self.deltas[-1] = np.dot(
np.diag(self.softmax_derivative(self.activations[-1])), # f'(z^L)
self.loss_gradient(self.activations[-1], y), # \grad_{a^L} l
)
for i in reversed(xrange(1, self.nlayers - 1)):
activation = self.activations[i]
derivative = np.diag(self.relu_derivative(activation))
self.deltas[i] = derivative.dot(self.weights[i+1]).dot(self.deltas[i+1])
def update_weights(self):
for i in xrange(1, self.nlayers):
self.weights[i] -= self.stepsize * self.activations[i-1].reshape(-1, 1).dot(self.deltas[i].reshape(1, -1))
self.biases[i] -= self.stepsize * self.deltas[i]
def _update(self, x, y):
self.feedforward(x)
self.backprop(y)
self.update_weights()
def loss(self, X, Y):
assert X.shape[0] == Y.shape[0]
return np.array([self._loss(X[i], Y[i]) for i in xrange(X.shape[0])]).sum()
def risk(self, X, Y):
return self.loss(X, Y) / float(X.shape[0])
def accuracy(self, X, Y):
assert X.shape[0] == Y.shape[0]
correct = np.count_nonzero([self.predict(X[i]) == np.argmax(Y[i]) for i in xrange(X.shape[0])])
return correct / float(X.shape[0])
def predict(self, x):
self.feedforward(x)
return np.argmax(self.activations[-1])
def _train(self, X, Y):
indices = np.arange(X.shape[0])
np.random.shuffle(indices)
for i in indices:
self._update(X[i], Y[i])
def train(self, X, Y, X_validate=None, Y_validate=None):
assert X.shape[0] == Y.shape[0]
X_validate = X_validate or X
Y_validate = Y_validate or Y
self.log = EpochLog(self.threshold, self.maxepoch)
while True:
self._train(X, Y)
loss = self.loss(X_validate, Y_validate)
if loss == np.inf:
self.stepsize *= self.STEPSIZE_STEP
warnings.warn('decreasing step size to {}'.format(self.stepsize), RuntimeWarning)
self._init_net()
self.log.reset()
continue
try:
self.log.add(loss)
except StopIteration:
return loss
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment