Last active
November 7, 2016 04:33
-
-
Save yasyf/16241e2f176ca97becf237fed7b75dda to your computer and use it in GitHub Desktop.
Fully-Connected NN with ReLU Hidden Units and Softmax Output
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import functools, warnings | |
class EpochLog(object): | |
IMPROVEMENT_WINDOW = 5 | |
def __init__(self, threshold, maxepoch): | |
self.threshold = threshold | |
self.maxepoch = maxepoch | |
self.reset() | |
def reset(self): | |
self.epochs = [] | |
@property | |
def nepochs(self): | |
return len(self.epochs) | |
@property | |
def improvement(self): | |
if self.nepochs < self.IMPROVEMENT_WINDOW: | |
return np.inf | |
return self.epochs[-self.IMPROVEMENT_WINDOW] - self.epochs[-1] | |
def _add(self, loss): | |
self.epochs.append(loss) | |
def add(self, loss): | |
self._add(loss) | |
if self.nepochs >= self.maxepoch or self.improvement < self.threshold: | |
raise StopIteration | |
def ignoreerrors(f): | |
@functools.wraps(f) | |
def wrapper(*args, **kwargs): | |
with np.errstate(all='ignore'): | |
return f(*args, **kwargs) | |
return wrapper | |
class FullyConnectedNN(object): | |
STEPSIZE_STEP = 0.1 | |
def __init__(self, layers, stepsize=1e-3, threshold=1, maxepoch=1e2): | |
self.stepsize = stepsize | |
self.threshold = threshold | |
self.maxepoch = maxepoch | |
self.layers = layers | |
assert self.nlayers > 1, "must have at least input and output layers" | |
self._init_net() | |
def _init_net(self): | |
self.deltas = [None] + ([0] * (self.nlayers - 1)) | |
self.activations = [None] + map(np.zeros, self.layers[1:]) | |
self.weights = self._init_weights() | |
self.biases = [None] + map(np.zeros, self.layers[1:]) | |
def _init_weights(self): | |
weights = [None] | |
for i in xrange(1, self.nlayers): | |
m = self.layers[i-1] | |
n = self.layers[i] | |
weights.append(np.random.normal(0, 1. / np.sqrt(m), (m, n))) | |
return weights | |
@property | |
def nlayers(self): | |
return len(self.layers) | |
@staticmethod | |
def relu_activation(z): | |
return np.maximum(0, z) | |
@staticmethod | |
def softmax_activation(z): | |
exponentiated = np.exp(z - z.max()) | |
return exponentiated / exponentiated.sum() | |
@staticmethod | |
def relu_derivative(a): | |
return (a > 0).astype(np.float) | |
@staticmethod | |
def softmax_derivative(a): | |
return a * (1 - a) | |
def activation_fn(self, z, i): | |
if i == 0 or i >= self.nlayers: | |
raise IndexError | |
elif i == (self.nlayers - 1): | |
return self.softmax_activation(z) | |
else: | |
return self.relu_activation(z) | |
@ignoreerrors | |
def _calc_loss(self, y): | |
return np.nan_to_num(-(y * np.log(self.activations[-1]))).sum() | |
def _loss(self, x, y): | |
self.feedforward(x) | |
return self._calc_loss(y) | |
@staticmethod | |
@ignoreerrors | |
def loss_gradient(a, y): | |
return np.nan_to_num(-np.divide(y, a)) | |
def feedforward(self, x): | |
self.activations[0] = x | |
for i in xrange(1, self.nlayers): | |
z = np.dot(self.weights[i].T, self.activations[i-1]) + self.biases[i] | |
self.activations[i] = self.activation_fn(z, i) | |
def backprop(self, y): | |
self.deltas[-1] = np.dot( | |
np.diag(self.softmax_derivative(self.activations[-1])), # f'(z^L) | |
self.loss_gradient(self.activations[-1], y), # \grad_{a^L} l | |
) | |
for i in reversed(xrange(1, self.nlayers - 1)): | |
activation = self.activations[i] | |
derivative = np.diag(self.relu_derivative(activation)) | |
self.deltas[i] = derivative.dot(self.weights[i+1]).dot(self.deltas[i+1]) | |
def update_weights(self): | |
for i in xrange(1, self.nlayers): | |
self.weights[i] -= self.stepsize * self.activations[i-1].reshape(-1, 1).dot(self.deltas[i].reshape(1, -1)) | |
self.biases[i] -= self.stepsize * self.deltas[i] | |
def _update(self, x, y): | |
self.feedforward(x) | |
self.backprop(y) | |
self.update_weights() | |
def loss(self, X, Y): | |
assert X.shape[0] == Y.shape[0] | |
return np.array([self._loss(X[i], Y[i]) for i in xrange(X.shape[0])]).sum() | |
def risk(self, X, Y): | |
return self.loss(X, Y) / float(X.shape[0]) | |
def accuracy(self, X, Y): | |
assert X.shape[0] == Y.shape[0] | |
correct = np.count_nonzero([self.predict(X[i]) == np.argmax(Y[i]) for i in xrange(X.shape[0])]) | |
return correct / float(X.shape[0]) | |
def predict(self, x): | |
self.feedforward(x) | |
return np.argmax(self.activations[-1]) | |
def _train(self, X, Y): | |
indices = np.arange(X.shape[0]) | |
np.random.shuffle(indices) | |
for i in indices: | |
self._update(X[i], Y[i]) | |
def train(self, X, Y, X_validate=None, Y_validate=None): | |
assert X.shape[0] == Y.shape[0] | |
X_validate = X_validate or X | |
Y_validate = Y_validate or Y | |
self.log = EpochLog(self.threshold, self.maxepoch) | |
while True: | |
self._train(X, Y) | |
loss = self.loss(X_validate, Y_validate) | |
if loss == np.inf: | |
self.stepsize *= self.STEPSIZE_STEP | |
warnings.warn('decreasing step size to {}'.format(self.stepsize), RuntimeWarning) | |
self._init_net() | |
self.log.reset() | |
continue | |
try: | |
self.log.add(loss) | |
except StopIteration: | |
return loss |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment