Last active
December 1, 2019 07:18
-
-
Save LiuinStein/58a71abee54acd0e03c3d1141bfacd26 to your computer and use it in GitHub Desktop.
Implement a neron network from very beginning
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import struct | |
from functools import reduce # python 3中移除了原生的reduce函数,因此。。。 | |
import time | |
def sigmoid(x): | |
return 1 / (1 + np.exp(-x)) | |
# 一个神经元结点 | |
class Node: | |
def __init__(self, layer_index, node_index): | |
self.layer_index = layer_index | |
self.node_index = node_index | |
self.upstream = [] # 对上游结点的连接 | |
self.downstream = [] # 对下游结点的连接 | |
self.output = 0 # 结点输出 | |
self.delta = 0 # 结点误差 | |
def as_input_node(self, data): | |
# 当结点属于输入层,输入数据即作为此结点的输出 | |
# 参数data即为输入数据 | |
self.output = data | |
def append_downstream_connection(self, conn): | |
# 添加一个到下游节点的连接 | |
# 其中conn为一个Connection对象,下同 | |
self.downstream.append(conn) | |
def append_upstream_connection(self, conn): | |
# 添加一个到上游节点的连接 | |
self.upstream.append(conn) | |
def calc_output(self): | |
# 通过y=sigmoid(w * x)计算结点输出 | |
# 在这用了一个比较巧妙的写法,reduce函数会将initial值放在sequence的最前面进行计算 | |
# 所以当下面的reduce进行计算时实则为: | |
# (((0+self.upstream[0])+self.upstream[1])+...) | |
# 0作为第一个ret使得后面self.upstream中的值将全部出现在lambda参数conn中 | |
self.output = sigmoid( | |
reduce(lambda ret, conn: | |
ret + conn.w * conn.upstream_node.output, | |
self.upstream, 0) | |
) | |
def calc_hidden_layer_delta(self): | |
# 当当前结点作为隐藏层出现时,计算结点误差 | |
# 参照结点误差计算公式 | |
# \delta_i = a_j * (1-a_j) * \sum_{k\in \text{Downstream}(j)}\delta_kw_{kj} | |
self.delta = self.output * (1 - self.output) * reduce( | |
lambda ret, conn: ret + conn.w * conn.downstream_node.delta, | |
self.downstream, 0 | |
) | |
def calc_output_layer_delta(self, t): | |
# 当当前结点作为输出层出现时,计算结点误差 | |
# 输入参数t作为当前预测的真实值 | |
self.delta = self.output * (1 - self.output) * (t - self.output) | |
# 因为要把bias项作为一个特殊的w项出现在计算中 | |
# 所以在此设立一个BiasNode,此结点输出恒为1,将其看做w_b * 1 | |
# 注意:与普通Node不同的是,bias项并不受上游结点的影响,但会对下游结点产生影响 | |
# 所以bias结点仅保留对下游结点的连接 | |
class BiasNode: | |
def __init__(self, layer_index, node_index): | |
self.layer_index = layer_index | |
self.node_index = node_index | |
self.downstream = [] # 对下游结点的连接 | |
self.output = 1 # 结点输出 | |
self.delta = 0 # 结点误差 | |
def append_downstream_connection(self, conn): | |
# 添加一个到下游节点的连接 | |
# 其中conn为一个Connection对象,下同 | |
self.downstream.append(conn) | |
def calc_hidden_layer_delta(self): | |
# 隐藏层结点误差计算 | |
self.delta = self.output * (1 - self.output) * reduce( | |
lambda ret, conn: ret + conn.w * conn.downstream_node.delta, | |
self.downstream, 0 | |
) | |
# 用于初始化一层 | |
class Layer: | |
def __init__(self, layer_index, node_count): | |
self.layer_index = layer_index | |
self.nodes = [] | |
for i in range(node_count): | |
self.nodes.append(Node(layer_index, i)) | |
self.nodes.append(BiasNode(layer_index, node_count)) | |
def as_input_layer(self, data): | |
# 当层是输入层的时候,输入数据即作为层中结点的输出 | |
for i in range(len(data)): | |
self.nodes[i].as_input_node(data[i]) | |
def calc_output(self): | |
# 计算层的输出,即为层中除BiasNode之外的每一个结点的输出 | |
for n in self.nodes[:-1]: | |
n.calc_output() | |
# 结点与结点之间的连接,包含一个w | |
class Connection: | |
def __init__(self, upstream_node, downstream_node): | |
# 结点downstream_node和upstream_node之间的连接 | |
self.upstream_node = upstream_node | |
self.downstream_node = downstream_node | |
# 权重初始为一个介于[-0.1, 0.1)之间的随机数,其服从均匀分布 | |
self.w = np.random.uniform(-0.1, 0.1) | |
# 梯度 | |
self.gradient = 0 | |
def calc_gradient(self): | |
# 其梯度,E对w_{ji}的偏导数即为-y_j * (t_j - y_j) * (1 - y_j) * x_{ji} | |
# w_{ji}即为上游结点i对下游结点j的连接 | |
# 我们发现,其值即为下游结点作为输出层时的误差乘以上游结点的输出 | |
self.gradient = self.downstream_node.delta * self.upstream_node.output | |
def update_weight(self, rate): | |
# 依据梯度下降法更新权重,其中rate为学习率 | |
self.calc_gradient() | |
self.w += rate * self.gradient | |
# 神经网络 | |
class Network: | |
def __init__(self, layer_node_count): | |
# 建立一个全连接神经网络 | |
# 其中layer_node_count是一个数组,其中分别代表了神经网络中每层的结点个数 | |
self.layers = [] # 神经网络的层 | |
layer_count = len(layer_node_count) # 层数 | |
# 建立层 | |
for i in range(layer_count): | |
self.layers.append(Layer(i, layer_node_count[i])) | |
# 建立连接 | |
self.connections = [] | |
# 连接层的数量比层的数量少1个 | |
for layer in range(layer_count - 1): | |
self.connections = [Connection(upstream_node, downstream_node) | |
for upstream_node in self.layers[layer].nodes | |
# 下一层总得留出一个来作为那个伪装成w的bias | |
for downstream_node in self.layers[layer + 1].nodes[:-1]] | |
for conn in self.connections: | |
conn.downstream_node.append_upstream_connection(conn) | |
conn.upstream_node.append_downstream_connection(conn) | |
def predict(self, data): | |
# 依据现有模型预测一个样本 | |
self.layers[0].as_input_layer(data) # 作为输入层对原始数据进行输入 | |
# 重新计算后面每一层的输出 | |
for j in range(1, len(self.layers)): | |
self.layers[j].calc_output() | |
# 最后一层(即为输出层)的所有节点输出映射为最终结果序列 | |
return list(map(lambda node: node.output, self.layers[-1].nodes[:-1])) | |
def calc_delta(self, labels): | |
# 计算结点误差 | |
for j in range(len(labels)): | |
# 最后一层即为输出层 | |
self.layers[-1].nodes[j].calc_output_layer_delta(labels[j]) | |
for j in range(len(labels)): | |
# 隐藏层,从第二层到倒数第二层 | |
for l in range(1, len(self.layers) - 1): | |
for node in self.layers[l].nodes: | |
node.calc_hidden_layer_delta() | |
def train(self, raw, labels, rate, iteration): | |
# 训练神经网络,raw为原始数据集,labels为真实值,rate为学习率,iteration为学习遍数 | |
for _ in range(iteration): | |
# 使用数据集中的每一个数据进行训练 | |
for i in range(len(raw)): | |
# 依据现有模型作出预测 | |
self.predict(raw[i]) | |
# 重新计算每一个结点的误差 | |
self.calc_delta(labels[i]) | |
# 重新计算权重 | |
for layer in self.layers[:-1]: | |
for node in layer.nodes: | |
# 当前结点的权重仅会对下游结点产生影响 | |
for conn in node.downstream: | |
conn.update_weight(rate) | |
def get_gradient(self, t, y): | |
# 计算样本下的梯度 | |
self.predict(y) | |
self.calc_delta(t) | |
# 计算每一个连接的梯度 | |
for layer in self.layers[:-1]: | |
for node in layer.nodes: | |
for conn in node.downstream: | |
conn.calc_gradient() | |
def network_error(self, vt, vy): | |
# 使用输出层结点误差平方和来计算误差 | |
# vt为真实值向量,vy为预测值向量 | |
return 0.5 * reduce(lambda a, b: a + b, | |
map(lambda tup: (tup[0] - tup[1]) ** 2, zip(vt, vy))) | |
def gradient_check(self, t, y, report_threshold): | |
# 获取样本在当前状态下网络的梯度值 | |
self.get_gradient(t, y) | |
# 对每个权重做梯度检查 | |
reported_difference = False | |
for conn in self.connections: | |
# 获取当前网络实际梯度值 | |
reported = conn.gradient | |
# 假定一个很小的误差值 | |
epsilon = 1e-4 | |
conn.w += epsilon | |
e1 = self.network_error(t, self.predict(y)) | |
conn.w -= 2 * epsilon | |
e2 = self.network_error(t, self.predict(y)) | |
# 依据导数的计算公式计算预期梯度 | |
expected = (e2 - e1) / (2 * epsilon) | |
difference = abs(expected - reported) | |
if difference > report_threshold: | |
reported_difference = True | |
print("expected: ", expected, " reported: ", reported, " absolute difference: ", difference, "\n") | |
if not reported_difference: | |
print("No any absolute differences larger than ", report_threshold, " has been detected") | |
# 训练/测试数据加载器 | |
class ImageLoader: | |
def __init__(self, dir, count): | |
self.dir = dir # MNIST数据集所在文件夹 | |
self.count = count # 需要加载的数据集数量 | |
# 获取图像 | |
def get_pictures(self, file): | |
with open(file, 'rb') as f: | |
content = f.read() | |
data_set = [] | |
for i in range(self.count): | |
# 从文件中获取图像 | |
# 第i张图片的初始位置 | |
begin = i * 28 * 28 + 16 | |
picture = [] | |
for i in range(28): | |
picture.append([]) | |
for j in range(28): | |
picture[i].append( | |
# 转换出来是个元组,所以需要加[0] | |
# content[begin + i * 28 + j] | |
struct.unpack_from("B", content, begin + i * 28 + j)[0] | |
) | |
# 将图像转化为样本的输入向量 | |
sample = [] | |
for k in range(28): | |
for j in range(28): | |
sample.append(picture[k][j]) | |
data_set.append(sample) | |
return data_set | |
# 获取真实结果集 | |
def get_labels(self, file): | |
with open(file, 'rb') as f: | |
content = f.read() | |
labels = [] | |
for i in range(self.count): | |
label = [] | |
# value = int() | |
value = struct.unpack_from('B', content, i + 8)[0] | |
# 以One-hot编码形式给出真实结果集 | |
# 此处设置One-hot编码下,真实值概率为0.95,假值的概率为0.05 | |
for j in range(10): | |
if value == j: | |
label.append(0.95) | |
else: | |
label.append(0.05) | |
labels.append(label) | |
return labels | |
# 获取训练集 | |
def get_training_data_set(self): | |
return self.get_pictures(self.dir + 'train-images-idx3-ubyte.gz'), \ | |
self.get_labels(self.dir + 'train-labels-idx1-ubyte.gz') | |
# 获取测试集 | |
def get_test_data_set(self): | |
return self.get_pictures(self.dir + 't10k-images-idx3-ubyte.gz'), \ | |
self.get_labels(self.dir + 't10k-labels-idx1-ubyte.gz') | |
# 评估组件 | |
class Evaluation: | |
def __init__(self, network): | |
self.network = network | |
def get_one_hot_result(self, vec): | |
# 从one-hot编码的输出中找到最终结果 | |
# 也就是概率最大的那一项 | |
mi = 0 | |
mv = 0 | |
for i in range(len(vec)): | |
if vec[i] > mv: | |
mv = vec[i] | |
mi = i | |
return mi | |
def evaluate(self, test_data, test_labels): | |
# 使用测试数据集进行评估 | |
error = 0 | |
total = len(test_data) | |
# 每逢错值,error+1 | |
for i in range(total): | |
t = self.get_one_hot_result(test_labels[i]) | |
y = self.get_one_hot_result(self.network.predict(test_data[i])) | |
if t != y: | |
error += 1 | |
# 返回错误率 | |
return float(error) / float(total) | |
if __name__ == '__main__': | |
# 加载数据集 | |
imageLoader = ImageLoader("F:\\Desktop\\data\\", 100) # 训练/测试500张图片 | |
train_data, train_labels = imageLoader.get_training_data_set() | |
test_data, test_labels = imageLoader.get_test_data_set() | |
# 建立神经网络 | |
network = Network([28 * 28, 100, 10]) # 输入层有784个结点,隐藏层100,输出层10 | |
# 训练30次,奇数次评估梯度 | |
print("Training started") | |
for i in range(1, 31): | |
start = time.time() | |
network.train(train_data, train_labels, rate=0.2, iteration=1) | |
end = time.time() | |
print("Training iter ", i, " took about ", round(end - start, 2), " seconds") | |
evaluator = Evaluation(network) | |
print("Accuracy rate evaluated as: ", | |
round((1.0 - evaluator.evaluate(test_data, test_labels)) * 100.0, 2), "%") | |
if (i & 1) == 1: | |
# 奇数趟做梯度检查 | |
start = time.time() | |
network.gradient_check(train_labels[0], train_labels[0], 1e-3) | |
end = time.time() | |
print("Gradient check took about ", round(end - start, 2), " seconds") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment