Skip to content

Instantly share code, notes, and snippets.

@LiuinStein
Last active December 1, 2019 07:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LiuinStein/58a71abee54acd0e03c3d1141bfacd26 to your computer and use it in GitHub Desktop.
Save LiuinStein/58a71abee54acd0e03c3d1141bfacd26 to your computer and use it in GitHub Desktop.
Implement a neron network from very beginning
import numpy as np
import struct
from functools import reduce # python 3中移除了原生的reduce函数,因此。。。
import time
def sigmoid(x):
return 1 / (1 + np.exp(-x))
# 一个神经元结点
class Node:
def __init__(self, layer_index, node_index):
self.layer_index = layer_index
self.node_index = node_index
self.upstream = [] # 对上游结点的连接
self.downstream = [] # 对下游结点的连接
self.output = 0 # 结点输出
self.delta = 0 # 结点误差
def as_input_node(self, data):
# 当结点属于输入层,输入数据即作为此结点的输出
# 参数data即为输入数据
self.output = data
def append_downstream_connection(self, conn):
# 添加一个到下游节点的连接
# 其中conn为一个Connection对象,下同
self.downstream.append(conn)
def append_upstream_connection(self, conn):
# 添加一个到上游节点的连接
self.upstream.append(conn)
def calc_output(self):
# 通过y=sigmoid(w * x)计算结点输出
# 在这用了一个比较巧妙的写法,reduce函数会将initial值放在sequence的最前面进行计算
# 所以当下面的reduce进行计算时实则为:
# (((0+self.upstream[0])+self.upstream[1])+...)
# 0作为第一个ret使得后面self.upstream中的值将全部出现在lambda参数conn中
self.output = sigmoid(
reduce(lambda ret, conn:
ret + conn.w * conn.upstream_node.output,
self.upstream, 0)
)
def calc_hidden_layer_delta(self):
# 当当前结点作为隐藏层出现时,计算结点误差
# 参照结点误差计算公式
# \delta_i = a_j * (1-a_j) * \sum_{k\in \text{Downstream}(j)}\delta_kw_{kj}
self.delta = self.output * (1 - self.output) * reduce(
lambda ret, conn: ret + conn.w * conn.downstream_node.delta,
self.downstream, 0
)
def calc_output_layer_delta(self, t):
# 当当前结点作为输出层出现时,计算结点误差
# 输入参数t作为当前预测的真实值
self.delta = self.output * (1 - self.output) * (t - self.output)
# 因为要把bias项作为一个特殊的w项出现在计算中
# 所以在此设立一个BiasNode,此结点输出恒为1,将其看做w_b * 1
# 注意:与普通Node不同的是,bias项并不受上游结点的影响,但会对下游结点产生影响
# 所以bias结点仅保留对下游结点的连接
class BiasNode:
def __init__(self, layer_index, node_index):
self.layer_index = layer_index
self.node_index = node_index
self.downstream = [] # 对下游结点的连接
self.output = 1 # 结点输出
self.delta = 0 # 结点误差
def append_downstream_connection(self, conn):
# 添加一个到下游节点的连接
# 其中conn为一个Connection对象,下同
self.downstream.append(conn)
def calc_hidden_layer_delta(self):
# 隐藏层结点误差计算
self.delta = self.output * (1 - self.output) * reduce(
lambda ret, conn: ret + conn.w * conn.downstream_node.delta,
self.downstream, 0
)
# 用于初始化一层
class Layer:
def __init__(self, layer_index, node_count):
self.layer_index = layer_index
self.nodes = []
for i in range(node_count):
self.nodes.append(Node(layer_index, i))
self.nodes.append(BiasNode(layer_index, node_count))
def as_input_layer(self, data):
# 当层是输入层的时候,输入数据即作为层中结点的输出
for i in range(len(data)):
self.nodes[i].as_input_node(data[i])
def calc_output(self):
# 计算层的输出,即为层中除BiasNode之外的每一个结点的输出
for n in self.nodes[:-1]:
n.calc_output()
# 结点与结点之间的连接,包含一个w
class Connection:
def __init__(self, upstream_node, downstream_node):
# 结点downstream_node和upstream_node之间的连接
self.upstream_node = upstream_node
self.downstream_node = downstream_node
# 权重初始为一个介于[-0.1, 0.1)之间的随机数,其服从均匀分布
self.w = np.random.uniform(-0.1, 0.1)
# 梯度
self.gradient = 0
def calc_gradient(self):
# 其梯度,E对w_{ji}的偏导数即为-y_j * (t_j - y_j) * (1 - y_j) * x_{ji}
# w_{ji}即为上游结点i对下游结点j的连接
# 我们发现,其值即为下游结点作为输出层时的误差乘以上游结点的输出
self.gradient = self.downstream_node.delta * self.upstream_node.output
def update_weight(self, rate):
# 依据梯度下降法更新权重,其中rate为学习率
self.calc_gradient()
self.w += rate * self.gradient
# 神经网络
class Network:
def __init__(self, layer_node_count):
# 建立一个全连接神经网络
# 其中layer_node_count是一个数组,其中分别代表了神经网络中每层的结点个数
self.layers = [] # 神经网络的层
layer_count = len(layer_node_count) # 层数
# 建立层
for i in range(layer_count):
self.layers.append(Layer(i, layer_node_count[i]))
# 建立连接
self.connections = []
# 连接层的数量比层的数量少1个
for layer in range(layer_count - 1):
self.connections = [Connection(upstream_node, downstream_node)
for upstream_node in self.layers[layer].nodes
# 下一层总得留出一个来作为那个伪装成w的bias
for downstream_node in self.layers[layer + 1].nodes[:-1]]
for conn in self.connections:
conn.downstream_node.append_upstream_connection(conn)
conn.upstream_node.append_downstream_connection(conn)
def predict(self, data):
# 依据现有模型预测一个样本
self.layers[0].as_input_layer(data) # 作为输入层对原始数据进行输入
# 重新计算后面每一层的输出
for j in range(1, len(self.layers)):
self.layers[j].calc_output()
# 最后一层(即为输出层)的所有节点输出映射为最终结果序列
return list(map(lambda node: node.output, self.layers[-1].nodes[:-1]))
def calc_delta(self, labels):
# 计算结点误差
for j in range(len(labels)):
# 最后一层即为输出层
self.layers[-1].nodes[j].calc_output_layer_delta(labels[j])
for j in range(len(labels)):
# 隐藏层,从第二层到倒数第二层
for l in range(1, len(self.layers) - 1):
for node in self.layers[l].nodes:
node.calc_hidden_layer_delta()
def train(self, raw, labels, rate, iteration):
# 训练神经网络,raw为原始数据集,labels为真实值,rate为学习率,iteration为学习遍数
for _ in range(iteration):
# 使用数据集中的每一个数据进行训练
for i in range(len(raw)):
# 依据现有模型作出预测
self.predict(raw[i])
# 重新计算每一个结点的误差
self.calc_delta(labels[i])
# 重新计算权重
for layer in self.layers[:-1]:
for node in layer.nodes:
# 当前结点的权重仅会对下游结点产生影响
for conn in node.downstream:
conn.update_weight(rate)
def get_gradient(self, t, y):
# 计算样本下的梯度
self.predict(y)
self.calc_delta(t)
# 计算每一个连接的梯度
for layer in self.layers[:-1]:
for node in layer.nodes:
for conn in node.downstream:
conn.calc_gradient()
def network_error(self, vt, vy):
# 使用输出层结点误差平方和来计算误差
# vt为真实值向量,vy为预测值向量
return 0.5 * reduce(lambda a, b: a + b,
map(lambda tup: (tup[0] - tup[1]) ** 2, zip(vt, vy)))
def gradient_check(self, t, y, report_threshold):
# 获取样本在当前状态下网络的梯度值
self.get_gradient(t, y)
# 对每个权重做梯度检查
reported_difference = False
for conn in self.connections:
# 获取当前网络实际梯度值
reported = conn.gradient
# 假定一个很小的误差值
epsilon = 1e-4
conn.w += epsilon
e1 = self.network_error(t, self.predict(y))
conn.w -= 2 * epsilon
e2 = self.network_error(t, self.predict(y))
# 依据导数的计算公式计算预期梯度
expected = (e2 - e1) / (2 * epsilon)
difference = abs(expected - reported)
if difference > report_threshold:
reported_difference = True
print("expected: ", expected, " reported: ", reported, " absolute difference: ", difference, "\n")
if not reported_difference:
print("No any absolute differences larger than ", report_threshold, " has been detected")
# 训练/测试数据加载器
class ImageLoader:
def __init__(self, dir, count):
self.dir = dir # MNIST数据集所在文件夹
self.count = count # 需要加载的数据集数量
# 获取图像
def get_pictures(self, file):
with open(file, 'rb') as f:
content = f.read()
data_set = []
for i in range(self.count):
# 从文件中获取图像
# 第i张图片的初始位置
begin = i * 28 * 28 + 16
picture = []
for i in range(28):
picture.append([])
for j in range(28):
picture[i].append(
# 转换出来是个元组,所以需要加[0]
# content[begin + i * 28 + j]
struct.unpack_from("B", content, begin + i * 28 + j)[0]
)
# 将图像转化为样本的输入向量
sample = []
for k in range(28):
for j in range(28):
sample.append(picture[k][j])
data_set.append(sample)
return data_set
# 获取真实结果集
def get_labels(self, file):
with open(file, 'rb') as f:
content = f.read()
labels = []
for i in range(self.count):
label = []
# value = int()
value = struct.unpack_from('B', content, i + 8)[0]
# 以One-hot编码形式给出真实结果集
# 此处设置One-hot编码下,真实值概率为0.95,假值的概率为0.05
for j in range(10):
if value == j:
label.append(0.95)
else:
label.append(0.05)
labels.append(label)
return labels
# 获取训练集
def get_training_data_set(self):
return self.get_pictures(self.dir + 'train-images-idx3-ubyte.gz'), \
self.get_labels(self.dir + 'train-labels-idx1-ubyte.gz')
# 获取测试集
def get_test_data_set(self):
return self.get_pictures(self.dir + 't10k-images-idx3-ubyte.gz'), \
self.get_labels(self.dir + 't10k-labels-idx1-ubyte.gz')
# 评估组件
class Evaluation:
def __init__(self, network):
self.network = network
def get_one_hot_result(self, vec):
# 从one-hot编码的输出中找到最终结果
# 也就是概率最大的那一项
mi = 0
mv = 0
for i in range(len(vec)):
if vec[i] > mv:
mv = vec[i]
mi = i
return mi
def evaluate(self, test_data, test_labels):
# 使用测试数据集进行评估
error = 0
total = len(test_data)
# 每逢错值,error+1
for i in range(total):
t = self.get_one_hot_result(test_labels[i])
y = self.get_one_hot_result(self.network.predict(test_data[i]))
if t != y:
error += 1
# 返回错误率
return float(error) / float(total)
if __name__ == '__main__':
# 加载数据集
imageLoader = ImageLoader("F:\\Desktop\\data\\", 100) # 训练/测试500张图片
train_data, train_labels = imageLoader.get_training_data_set()
test_data, test_labels = imageLoader.get_test_data_set()
# 建立神经网络
network = Network([28 * 28, 100, 10]) # 输入层有784个结点,隐藏层100,输出层10
# 训练30次,奇数次评估梯度
print("Training started")
for i in range(1, 31):
start = time.time()
network.train(train_data, train_labels, rate=0.2, iteration=1)
end = time.time()
print("Training iter ", i, " took about ", round(end - start, 2), " seconds")
evaluator = Evaluation(network)
print("Accuracy rate evaluated as: ",
round((1.0 - evaluator.evaluate(test_data, test_labels)) * 100.0, 2), "%")
if (i & 1) == 1:
# 奇数趟做梯度检查
start = time.time()
network.gradient_check(train_labels[0], train_labels[0], 1e-3)
end = time.time()
print("Gradient check took about ", round(end - start, 2), " seconds")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment