Created
May 28, 2012 14:19
-
-
Save lambdalisue/2819417 to your computer and use it in GitHub Desktop.
Neural Network for Beginner (for learning)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# vim: set fileencoding=utf-8: | |
import pickle | |
import json | |
from math import exp | |
from random import random | |
from random import shuffle | |
def sigmoid(x): | |
"""A standard sigmoid function""" | |
return 1.0 / (1.0 + exp(-x)) | |
class Axon(object): | |
"""An axon model which is used to connect two neurons""" | |
def __init__(self): | |
self.weight = random() | |
self.lhs = None | |
self.rhs = None | |
def compute(self): | |
"""Compute the signal of this axon""" | |
value = self.lhs.output | |
return self.weight * value | |
@classmethod | |
def connect(cls, lhs, rhs): | |
"""Connect two neurons with new axon""" | |
axon = cls() | |
axon.lhs = lhs | |
axon.rhs = rhs | |
lhs.oaxons.append(axon) | |
rhs.iaxons.append(axon) | |
class Neuron(object): | |
"""A neuron model""" | |
def __init__(self, output=0): | |
self.output = output # Calculated output value | |
self.oaxons = [] # Out going neurons | |
self.iaxons = [] # In coming neurons | |
def connect(self, neuron): | |
"""Connect to other neuron""" | |
Axon.connect(self, neuron) | |
def compute(self): | |
"""Compute the output of the neuron""" | |
signals = [axon.compute() for axon in self.iaxons] | |
self.output = sigmoid(sum(signals)) | |
return self.output | |
def clamp(self): | |
"""Clamp output of the neuron""" | |
if self.output < 0.1: | |
return 0 | |
elif self.output > 0.9: | |
return 1 | |
else: | |
return -1 | |
class Bias(Neuron): | |
"""Bias neuron which always return 1""" | |
def __init__(self): | |
super(Bias, self).__init__(output=1) | |
def compute(self): | |
return self.output | |
class Layer(object): | |
"""A neural layer model""" | |
def __init__(self, number_of_neurons): | |
self.neurons = [] | |
for i in xrange(number_of_neurons): | |
self.neurons.append(Neuron()) | |
def connect(self, layer): | |
"""Connect neurons of two layer together""" | |
for lneuron in self.neurons: | |
for rneuron in layer.neurons: | |
lneuron.connect(rneuron) | |
def compute(self): | |
"""Compute all neurons in the layer and return list of the results""" | |
results = [] | |
for neuron in self.neurons: | |
results.append(neuron.compute()) | |
return results | |
def clamp(self): | |
"""Return clamp results of calculation of each neurons""" | |
results = [] | |
for neuron in self.neurons: | |
neuron.compute() | |
results.append(neuron.clamp()) | |
return results | |
class Network(object): | |
def __init__(self, ni, nh, no): | |
"""Construct neural network | |
Attributes: | |
ni | |
number of input neurons | |
nh | |
number of hidden neurons | |
no | |
number of output neurons | |
""" | |
# Construct input layer | |
self.input = Layer(ni) | |
self.input.neurons.append(Bias()) | |
# Construct hidden layer | |
self.hidden = Layer(nh) | |
self.hidden.neurons.append(Bias()) | |
# Construct output layer | |
self.output = Layer(no) | |
# connect | |
self.input.connect(self.hidden) | |
self.hidden.connect(self.output) | |
def compute(self, values): | |
"""Compute the network and return clampped results""" | |
# Set input values to input layer | |
for i in xrange(len(values)): | |
self.input.neurons[i].output = values[i] | |
# calculate hidden layer | |
values = self.hidden.compute() | |
# calculate output layer | |
values = self.output.clamp() | |
return values | |
def get_accuracy(self, patterns): | |
"""get accuracy of the network to the patterns""" | |
if len(patterns) == 0: | |
return 100 | |
"""return accuracy on the patterns""" | |
def is_correct(desired, actual): | |
# check all outputs against desired outputs | |
for i in xrange(len(desired)): | |
if desired[i] != actual[i]: | |
return False | |
return True | |
incorrects = 0 | |
for pattern in patterns: | |
inputs, outputs = pattern | |
results = self.compute(inputs) | |
if not is_correct(outputs, results): | |
incorrects += 1 | |
# calculate error and return as percentage | |
return 100 - (float(incorrects) / len(patterns) * 100) | |
def get_mse(self, patterns): | |
"""return mean squared error on the patterns""" | |
if len(patterns) == 0: | |
return 0.0 | |
mse = 0.0 | |
for pattern in patterns: | |
inputs, outputs = pattern | |
results = self.compute(inputs) | |
for i in xrange(len(outputs)): | |
mse += pow((results[i] - outputs[i]), 2) | |
return mse / (len(self.output.neurons) * len(patterns)) | |
def save(self, filename): | |
"""save the network to the file""" | |
with open(filename, 'wb') as f: | |
pickle.dump(self, f) | |
@classmethod | |
def load(cls, filename): | |
"""load the network to the file""" | |
with open(filename, 'rb') as f: | |
network = pickle.load(f) | |
return network | |
class Dataset(object): | |
"""A dataset model""" | |
def __init__(self): | |
self.datas = [] | |
def shuffle(self): | |
"""shuffle the datas""" | |
shuffle(self.datas) | |
def save(self, filename): | |
"""save the dataset""" | |
with open(filename, 'w') as f: | |
json.dump(self.datas, f, indent=2) | |
@classmethod | |
def load(cls, filename): | |
"""load the dataset""" | |
with open(filename, 'r') as f: | |
datas = json.load(f) | |
dataset = Dataset() | |
dataset.datas = datas | |
return dataset | |
@property | |
def training(self): | |
size = len(self.datas) | |
if size < 10: | |
return self.datas | |
index = int(float(size) / 100 * 60) | |
return self.datas[0:index] | |
@property | |
def generalization(self): | |
size = len(self.datas) | |
if size < 10: | |
return self.datas | |
index = int(float(size) / 100 * 20) | |
return self.datas[index * 3:index * 4] | |
@property | |
def validation(self): | |
size = len(self.datas) | |
if size < 10: | |
return self.datas | |
index = int(float(size) / 100 * 20) | |
return self.datas[index * 4:index * 5] | |
class BackPropagation(object): | |
"""Back-propagation trainer model""" | |
LEARNING_RATE = 0.04 | |
MOMENTUM = 0.6 | |
MAX_EPOCHS = 500000 | |
DESIRED_ACCURACY = 100 | |
def __init__(self, network): | |
self.network = network | |
def train(self, dataset): | |
"""train the network with the dataset""" | |
tmse = 0 | |
gmse = 0 | |
taccuracy = 0 | |
gaccuracy = 0 | |
epoch = 0 | |
while self.MAX_EPOCHS == -1 or epoch < self.MAX_EPOCHS: | |
epoch += 1 | |
# store previous accuracy | |
ptaccuracy = taccuracy | |
pgaccuracy = gaccuracy | |
# train network | |
taccuracy, tmse = self.train_epoch(dataset.training) | |
# get generalization set accuracy and mse | |
gaccuracy = self.network.get_accuracy(dataset.generalization) | |
gmse = self.network.get_mse(dataset.generalization) | |
if round(ptaccuracy) != round(taccuracy) or round(pgaccuracy) != round(gaccuracy): | |
print "Epoch:", epoch | |
print " TSet Acc:", str(taccuracy) + "%", "MSE:", tmse | |
print " GSet Acc:", str(gaccuracy) + "%", "MSE:", gmse | |
if taccuracy >= self.DESIRED_ACCURACY and gaccuracy >= self.DESIRED_ACCURACY: | |
break | |
if epoch % 10000 == 0: | |
# shuffle the dataset | |
shuffle(dataset.datas) | |
# validate | |
vaccuracy = self.network.get_accuracy(dataset.validation) | |
vmse = self.network.get_mse(dataset.validation) | |
print "=== Train finish ===============================" | |
print " TSet Acc:", str(taccuracy) + "%", "MSE:", tmse | |
print " GSet Acc:", str(gaccuracy) + "%", "MSE:", gmse | |
print " VSet Acc:", str(vaccuracy) + "%", "MSE:", vmse | |
def train_epoch(self, patterns): | |
incorrects = 0 | |
mse = 0.0 | |
for pattern in patterns: | |
inputs, outputs = pattern | |
# compute network | |
results = self.network.compute(inputs) | |
# backpropagate | |
self.backpropagate(outputs) | |
# accuracy & error check | |
is_correct = True | |
for i in xrange(len(outputs)): | |
if results[i] != outputs[i]: | |
is_correct = False | |
mse += pow((results[i] - outputs[i]), 2) | |
if not is_correct: | |
incorrects += 1 | |
accuracy = 100 - (float(incorrects) / len(patterns) * 100) | |
mse = mse / (len(patterns[0][1]) * len(patterns)) | |
return accuracy, mse | |
def backpropagate(self, outputs): | |
# | |
# Note: | |
# | |
# dsigmoid(x) = sigmoid(x) * (1.0 - sigmoid(x)) | |
# sigmoid(x) = neuron.output | |
# delta(x) = dsigmoid(x) * error | |
# = neuron.output * (1.0 - neuron.output) * error | |
# | |
# calculate the error gradient of each neuron in the output layer | |
for i, neuron in enumerate(self.network.output.neurons): | |
error = outputs[i] - neuron.output | |
delta = neuron.output * (1.0 - neuron.output) * error | |
delta *= self.LEARNING_RATE | |
delta += self.MOMENTUM * getattr(neuron, '_delta', 0) | |
# store values on the neuron | |
neuron._error = error | |
neuron._delta = delta | |
# calculate the error gradient of each neuron in the hidden layer | |
for i, neuron in enumerate(self.network.hidden.neurons): | |
# calculate error of hidden neurons through output neuron's error | |
error = 0.0 | |
for axon in neuron.oaxons: | |
error += axon.weight * axon.rhs._delta | |
delta = neuron.output * (1.0 - neuron.output) * error | |
delta *= self.LEARNING_RATE | |
delta += self.MOMENTUM * getattr(neuron, '_delta', 0) | |
# store values on the neuron | |
neuron._error = error | |
neuron._delta = delta | |
# update weights of each axons | |
def update_layer_weights(layer): | |
# update weights of axons of output layer | |
for i, neuron in enumerate(layer.neurons): | |
for axon in neuron.iaxons: | |
axon.weight += neuron._delta * axon.lhs.output | |
update_layer_weights(self.network.output) | |
update_layer_weights(self.network.hidden) | |
if __name__ == '__main__': | |
from optparse import OptionParser | |
parser = OptionParser() | |
parser.add_option('-n', '--network', dest='network', | |
help='load neural network from FILE', metavar='FILE') | |
parser.add_option('-d', '--dataset', dest='dataset', | |
help='load dataset from FILE', metavar='FILE') | |
parser.add_option('-t', '--train', dest='train', default=False, | |
help='train the network', action='store_true') | |
parser.add_option('--learning-rate', dest='learning_rate', default=0.02, | |
type='float', help='Learning rate of the training') | |
parser.add_option('--max-epoch', dest='max_epoch', default=1000000, | |
type='int', help='Maximum epoch of the training') | |
parser.add_option('--momentum', dest='momentum', default=0.8, | |
type='float', help='Momentum of the training') | |
parser.add_option('--desired-accuracy', dest='desired_accuracy', default=100, | |
type='int', help='Desired accuracy of the training') | |
opts, args = parser.parse_args() | |
if opts.dataset is None: | |
parser.print_help() | |
exit(0) | |
# Load dataset | |
dataset = Dataset.load(opts.dataset) | |
dataset.shuffle() | |
# Load network or create new one | |
if opts.network is None: | |
ni = len(dataset.datas[0][0]) | |
no = len(dataset.datas[0][1]) | |
nh = ni * 3 | |
network = Network(ni, nh, no) | |
else: | |
network = Network.load(opts.network) | |
if opts.train: | |
trainer = BackPropagation(network) | |
trainer.LEARNING_RATE = opts.learning_rate | |
trainer.MAX_EPOCHS = opts.max_epoch | |
trainer.momentum = opts.momentum | |
trainer.DESIRED_ACCURACY = opts.desired_accuracy | |
filename = opts.network or 'brain.bin' | |
try: | |
trainer.train(dataset) | |
except KeyboardInterrupt: | |
print "--- Train suspended. Now saving the network to %s ..." % filename | |
network.save(filename) | |
exit(1) | |
else: | |
print "--- Train finish. Now saving the network to %s ..." % filename | |
network.save(filename) | |
print "Input values with space and hit return. Exit with Ctrl-C" | |
while True: | |
try: | |
INPUT = raw_input("Data >") | |
values = INPUT.split(' ') | |
values = map(float, values) | |
results = network.compute(values) | |
print values, "=>", results | |
except KeyError: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment