Skip to content

Instantly share code, notes, and snippets.

@sgugger
Created July 1, 2019 17:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sgugger/71cf2817cf7d6aa700ccb1cd353bfb1e to your computer and use it in GitHub Desktop.
Save sgugger/71cf2817cf7d6aa700ccb1cd353bfb1e to your computer and use it in GitHub Desktop.
Numpy nn
# -*- coding: utf-8 -*-
import numpy as np
import _pickle as cPickle
import gzip
import json
import sys
import time
class Quadratic(object):
@staticmethod
def fn(a,y):
return (((a-y)**2).sum(axis=1)).mean()
@staticmethod
def der(a,y):
return 2. * (a-y)
@staticmethod
def der_sec(a,y):
return np.array([2. * np.eye(a.shape[1])] * a.shape[0])
class CrossEntropy(object):
@staticmethod
def fn(a,y):
return (np.nan_to_num(-y*np.log(a)-(1-y)*np.log(1-a)).sum(axis=1)).mean()
@staticmethod
def der(a,y):
return np.nan_to_num(-y/a + (1-y)/(1-a))
@staticmethod
def der_sec(a,y):
return add_dim_diag(np.nan_to_num(y/(a**2) + (1-y)/((1-a)**2)))
class Sigmoid():
@staticmethod
def fn(x):
return 1./(1.+np.exp(-x))
@staticmethod
def der(x):
return Sigmoid.fn(x) * (1.-Sigmoid.fn(x))
@staticmethod
def der_sec(x):
return Sigmoid.fn(x) * (1.-Sigmoid.fn(x)) * (1.-2.*Sigmoid.fn(x))
class Relu():
@staticmethod
def fn(x):
return np.where(x>0,x,0.)
@staticmethod
def der(x):
return np.where(x>0,1.,0.)
@staticmethod
def der_sec(x):
return np.zeros(x.shape)
class Softmax():
@staticmethod
def fn(x):
return np.exp(x) / np.expand_dims(np.sum(np.exp(x), axis=1),axis=1)
@staticmethod
def der(x):
s = Softmax.fn(x)
p1 = np.array([np.diag(s[i,:]) for i in range(s.shape[0])])
p2 = np.matmul(np.expand_dims(s,axis=2), np.expand_dims(s,axis=1))
return p1-p2
@staticmethod
def der_sec(x):
s = Softmax.fn(x)
r = np.zeros((s.shape[0],s.shape[1],s.shape[1],s.shape[1]))
for l in range(s.shape[0]):
for i in range(s.shape[1]):
for j in range(s.shape[1]):
for k in range(s.shape[1]):
r[l,i,j,k] = 2. *s[l,i] * s[l,j] * s[l,k]
if i==j:
r[l,i,j,k] -= s[l,i]*s[l,k]
if i==k:
r[l,i,j,k] -= s[l,i]*s[l,j]
if j==k:
r[l,i,j,k] -= s[l,i]*s[l,j]
if i==j and j==k:
r[l,i,j,k] += s[l,i]
return r
class Dense(object):
def __init__(self,n_in,n_out,activation=Relu,weights=None,biases=None):
self.size = n_in,n_out
if biases is None:
self.biases = np.zeros(n_out)
else:
self.biases = biases
if weights is None:
self.weights = np.random.normal(scale=2./np.sqrt(float(n_in)),size=(n_in,n_out))
else:
self.weights = weights
self.activation = activation;
def feed_forward(self,x):
return (self.activation).fn(np.dot(x,self.weights) + self.biases)
def feed_detail(self,x):
z = np.dot(x,self.weights) + self.biases
return (self.activation).fn(z), z
def backprop(self,z,grad):
tp = (self.activation).der(z)
if len(tp.shape) == 2:
delta = grad * tp
else:
delta = np.matmul(np.expand_dims(grad,axis=1),tp).reshape(grad.shape)
return delta, np.dot(delta,self.weights.transpose())
def backprop2(self,z,grad,grad2):
d2z = (self.activation).der_sec(z)
if len(d2z.shape) <= 3:
delta2 = add_dim_diag(d2z * grad)
else:
grads2 = np.transpose(np.array([grad] * grad.shape[1]),(1,0,2))
delta2 = np.matmul(np.expand_dims(grads2,axis=2),d2z).reshape(grad2.shape)
dz = (self.activation).der(z)
if len(dz.shape) == 2:
delta2 += np.matmul(np.expand_dims(dz,axis=2),np.expand_dims(dz,axis=1)) * grad2
else:
delta2 += np.matmul(dz,np.matmul(grad2,dz))
grad2 = np.matmul(self.weights,np.matmul(delta2,np.transpose(self.weights)))
return delta2, grad2
def feed_plus_eps(self,x,i,j,qty):
z = np.dot(x,self.weights) + self.biases
if i == -1:
bc = np.zeros(self.biases.shape)
bc[j] = qty
z += bc
else:
wc = np.zeros(self.weights.shape)
wc[i,j] = qty
z += np.dot(x,wc)
return self.activation.fn(z)
def feed_plus_eps2(self,x,i1,j1,i2,j2,qty):
z = np.dot(x,self.weights) + self.biases
if i1 == -1:
bc = np.zeros(self.biases.shape)
bc[j1] = qty
z += bc
else:
wc = np.zeros(self.weights.shape)
wc[i1,j1] = qty
z += np.dot(x,wc)
if i2 == -1:
bc = np.zeros(self.biases.shape)
bc[j2] = qty
z += bc
else:
wc = np.zeros(self.weights.shape)
wc[i2,j2] = qty
z += np.dot(x,wc)
return self.activation.fn(z)
class Convolution(object):
def __init__(self,size,activation=Relu,weights=None,bias=None):
self.size = size
if bias is None:
self.bias = 0.
else:
self.bias = bias
if weights is None:
self.weights = np.random.normal(scale=2./np.sqrt(float(size[0])),size=size)
else:
self.weights = weights
self.activation = activation;
def feed_forward(self,x):
z = np.zeros((x.shape[0],x.shape[1]-self.size[0]+1,x.shape[2]-self.size[1]+1))
m = self.size[0] * self.size[1]
w_col = np.reshape(self.weights,(m,1))
for i in range(z.shape[1]):
for j in range(z.shape[2]):
x1 = np.reshape(x[:,i:i+self.size[0],j:j+self.size[1]],(x.shape[0],m))
z[:,i,j] = np.reshape(np.matmul(x1,w_col)+self.bias,x.shape[0])
return (self.activation).fn(z)
def feed_detail(self,x):
z = np.zeros((x.shape[0],x.shape[1]-self.size[0]+1,x.shape[2]-self.size[1]+1))
m = self.size[0] * self.size[1]
w_col = np.reshape(self.weights,(m,1))
for i in range(z.shape[1]):
for j in range(z.shape[2]):
x1 = np.reshape(x[:,i:i+self.size[0],j:j+self.size[1]],(x.shape[0],m))
z[:,i,j] = np.reshape(np.matmul(x1,w_col)+self.bias,x.shape[0])
return (self.activation).fn(z), z
def backprop(self,z,grad):
tp = (self.activation).der(z)
if len(tp.shape) == 2:
delta = grad * tp
else:
delta = np.matmul(np.expand_dims(grad,axis=1),tp).reshape(grad.shape)
return delta, np.dot(delta,self.weights.transpose())
def feed_plus_eps(self,x,i,j,qty):
z = np.dot(x,self.weights) + self.biases
if i == -1:
bc = np.zeros(self.biases.shape)
bc[j] = qty
z += bc
else:
wc = np.zeros(self.weights.shape)
wc[i,j] = qty
z += np.dot(x,wc)
return self.activation.fn(z)
class Model(object):
def __init__(self,layers,cost=Quadratic):
self.layers = layers
self.cost = cost
def feed_forward(self,x):
for i in range(0,len(self.layers)):
x = self.layers[i].feed_forward(x)
return x
def backprop(self,x,y):
activ = [x]
vectorsz = []
n = x.shape[0]
for i in range(0,len(self.layers)):
x,z = self.layers[i].feed_detail(x)
vectorsz.append(z)
activ.append(x)
c = (self.cost).der(activ[-1],y)
nabla_b = []
nabla_w= []
for i in range(1,len(self.layers)+1):
delta,c = self.layers[-i].backprop(vectorsz[-i],c)
nabla_b.insert(0,delta.mean(axis=0))
nabla_w.insert(0, np.tensordot(activ[-i-1],delta,axes = (0,0))/n)
return activ[-1], nabla_b, nabla_w
def backprop2(self,x,y,idx):
activ = [x]
vectorsz = []
n = x.shape[0]
for i in range(0,len(self.layers)):
x,z = self.layers[i].feed_detail(x)
vectorsz.append(z)
activ.append(x)
c = (self.cost).der(activ[-1],y)
c2 = (self.cost).der_sec(activ[-1],y)
nabla_b = []
nabla_w= []
nabla2_b = []
nabla2_w = []
for i in range(1,len(self.layers)+1):
delta2,c2 = self.layers[-i].backprop2(vectorsz[-i],c,c2)
##nabla2_b.insert(0, (np.diagonal(delta2,axis1=1,axis2=2)).mean(axis=0))
#nabla2_b.insert(0, delta2.mean(axis=0))
nabla2_b.insert(0,[extract_sub(delta2,L).mean(axis=0) for L in idx[-i][0]])
activs = np.matmul(np.expand_dims(activ[-i-1],axis=2),np.expand_dims(activ[-i-1],axis=1))
DW = np.array([np.kron(activs[k,:,:],delta2[k,:,:]) for k in range(n)])
#nabla2_w.insert(0, DW.mean(axis=0))
nabla2_w.insert(0,[extract_sub(DW,L).mean(axis=0) for L in idx[-i][1]])
delta,c = self.layers[-i].backprop(vectorsz[-i],c)
nabla_b.insert(0,delta.mean(axis=0))
nabla_w.insert(0, np.tensordot(activ[-i-1],delta,axes = (0,0))/n)
return activ[-1], nabla_b, nabla_w, nabla2_b, nabla2_w
def fit(self, training_data, epochs, batch_size, lr, method = "Adam", validation_data=None):
n = len(training_data)
t=0
if method == "Adam":
old_grads = [[np.zeros(l.weights.shape) for l in self.layers], [np.zeros(l.biases.shape) for l in self.layers]]
old_norms = [[np.zeros(l.weights.shape) for l in self.layers], [np.zeros(l.biases.shape) for l in self.layers]]
for j in range(epochs):
minibatches = create_minibatches(training_data,batch_size)
num = 0
cos = 0.
numT = 0
for i in range(0,len(minibatches)):
t+=1
if method=="SGD":
dn, dc = self.update_SGD(minibatches[i],lr)
elif method=="NSGD":
dn, dc = self.update_NSGD(minibatches[i],lr)
elif method == "Adam":
old_grads,old_norms,dn, dc = self.update_Adam(minibatches[i],lr,old_grads,old_norms,t)
num += dn
cos += dc
numT += minibatches[i][0].shape[0]
avg = float(num)/float(numT)
printProgressBar(numT,n, "Epoch %i" % (j+1), "{:.2f}".format(100. * avg) + "%, cost="+"{:.6f}".format(cos /float(numT)))
if validation_data:
avg,cos = self.evaluate(validation_data)
print(f"Epoch {j+1}: {avg} %% cost = {cos}")
else:
print(f"Epoch {j+1} complete")
def update_SGD(self,minibatch, lr):
preds, nabla_b, nabla_w = self.backprop(minibatch[0],minibatch[1])
for i in range(len(self.layers)):
self.layers[i].weights -= lr * nabla_w[i]
self.layers[i].biases -= lr * nabla_b[i]
num = np.equal(preds.argmax(axis=1),minibatch[1].argmax(axis=1)).sum()
cos = (self.cost).fn(preds,minibatch[1]) * minibatch[0].shape[0]
return num, cos
def update_Adam(self, minibatch, lr, old_grads, old_norms,t):
preds, nabla_b, nabla_w = self.backprop(minibatch[0],minibatch[1])
old_grads = [[0.9 * onw + 0.1 * nw for (onw,nw) in zip(old_grads[0], nabla_w)],
[0.9 * onb + 0.1 * nb for (onb,nb) in zip(old_grads[1], nabla_b)]]
old_norms = [[0.999 * onw + 0.001 * nw ** 2 for (onw,nw) in zip(old_norms[0], nabla_w)],
[0.999 * onb + 0.001 * nb ** 2 for (onb,nb) in zip(old_norms[1], nabla_b)]]
for i in range(len(self.layers)):
self.layers[i].weights -= lr * (old_grads[0][i]/(1-0.9**t)) / (np.sqrt(old_norms[0][i]/(1-0.999**t))+1e-8)
self.layers[i].biases -= lr * (old_grads[1][i]/(1-0.9**t)) / (np.sqrt(old_norms[1][i]/(1-0.999**t))+1e-8)
num = np.equal(preds.argmax(axis=1),minibatch[1].argmax(axis=1)).sum()
cos = (self.cost).fn(preds,minibatch[1]) * minibatch[0].shape[0]
return old_grads, old_norms,num, cos
def update_NSGD(self,minibatch, lr):
eps = 1e-4
preds, nabla_b, nabla_w, na2_b, na2_w = self.backprop2(minibatch[0],minibatch[1])
na2_b = [clip(nb2,eps) for nb2 in na2_b]
na2_w = [clip(nw2,eps) for nw2 in na2_w]
for i in range(len(self.layers)):
self.layers[i].weights -= lr * nabla_w[i]/na2_w[i]
self.layers[i].biases -= lr * nabla_b[i]/na2_b[i]
num = np.equal(preds.argmax(axis=1),minibatch[1].argmax(axis=1)).sum()
cos = (self.cost).fn(preds,minibatch[1]) * minibatch[0].shape[0]
return num, cos
def evaluate(self,tst_data,batch_size=64):
mbs = create_minibatches(tst_data,batch_size,False)
num_correct = 0
coste = 0.
for mb in mbs:
z = self.feed_forward(mb[0])
preds = z.argmax(axis=1)
num_correct += np.equal(preds,mb[1].argmax(axis=1)).sum()
coste += (self.cost).fn(z,mb[1]) * z.shape[0]
return 100.* float(num_correct) /float(len(tst_data)) , coste / float(len(tst_data))
def save(self,filename):
data = {"cost" : str(self.cost.__name__)}
c = 0
for l in self.layers:
c+=1
data_l = {"name" : str(l.__class__.__name__),
"activation" : str(l.activation.__name__),
"size": list(l.size),
"weights": l.weights.tolist(),
"biases": l.biases.tolist()}
data["layer" + str(c)] =data_l
f = open(filename, "w")
json.dump(data, f)
f.close()
def eval_der(self,x,y,l,i,j):
eps = 1e-5
z1 = self.feed_forward(x)
for cl in range(0,len(self.layers)):
if cl == l:
x = self.layers[cl].feed_plus_eps(x,i,j,eps)
else:
x = self.layers[cl].feed_forward(x)
return ((self.cost).fn(x,y)-(self.cost).fn(z1,y))/eps
def eval_der_sec(self,x,y,l1,i1,j1,l2,i2,j2):
eps = 1e-5
z = self.feed_forward(x)
x1 = np.copy(x)
x2 = np.copy(x)
x3 = np.copy(x)
for cl in range(0,len(self.layers)):
if cl == l1:
x1 = self.layers[cl].feed_plus_eps(x1,i1,j1,eps)
else:
x1 = self.layers[cl].feed_forward(x1)
if cl == l2:
x2 = self.layers[cl].feed_plus_eps(x2,i2,j2,eps)
else:
x2 = self.layers[cl].feed_forward(x2)
if cl==l1 and cl==l2:
x3 = self.layers[cl].feed_plus_eps2(x3,i1,j1,i2,j2,eps)
elif cl==l1:
x3 = self.layers[cl].feed_plus_eps(x3,i1,j1,eps)
elif cl==l2:
x3 = self.layers[cl].feed_plus_eps(x3,i2,j2,eps)
else:
x3 = self.layers[cl].feed_forward(x3)
return ((self.cost).fn(z,y)+(self.cost).fn(x3,y)-(self.cost).fn(x1,y)-(self.cost).fn(x2,y))/(eps**2)
def eval_with_grad(self,tst_data,batch_size=64):
mbs = create_minibatches(tst_data,batch_size,False)
num_correct = 0
coste = 0.
gradb = [0.] * len(self.layers)
gradw = [0.] * len(self.layers)
for mb in mbs:
z = self.feed_forward(mb[0])
preds = z.argmax(axis=1)
num_correct += np.equal(preds,mb[1].argmax(axis=1)).sum()
coste += (self.cost).fn(z,mb[1]) * z.shape[0]
L = self.backprop(mb[0],mb[1])
for i in range(len(self.layers)):
gradb[i] += np.linalg.norm(L[1][i])
gradw[i] += np.linalg.norm(L[2][i])
acc = 100.* float(num_correct)/len(tst_data)
coste /= len(tst_data)
for i in range(len(self.layers)):
gradb[i] /= len(tst_data)
gradw[i] /= len(tst_data)
return acc, coste, gradb, gradw
def load_data():
f = gzip.open("D:/Temp/Deeplearning/neural-networks-and-deep-learning/data/mnist.pkl.gz", 'rb')
tr_d, va_d, te_d = cPickle.load(f,encoding='latin1')
f.close()
training_results = [vectorized_result(y) for y in tr_d[1]]
training_data = list(zip(tr_d[0] , training_results))
val_results = [vectorized_result(y) for y in va_d[1]]
validation_data = list(zip(va_d[0],val_results))
test_results = [vectorized_result(y) for y in te_d[1]]
test_data = list(zip(te_d[0], test_results))
return (training_data, validation_data, test_data)
def load_databis():
f = gzip.open("D:/Temp/Deeplearning/neural-networks-and-deep-learning/data/mnist.pkl.gz", 'rb')
tr_d, va_d, te_d = cPickle.load(f)
f.close()
return tr_d
def create_minibatches(data,size,shuffle=True):
n = len(data)
L = list(range(0,n))
if shuffle:
L = np.random.permutation(L)
if n%size != 0:
last_X = np.array([data[L[n//size + j]][0] for j in range(0,n%size)])
last_Y = np.array([data[L[n//size + j]][1] for j in range(0,n%size)])
else:
last_X = np.array([data[L[n//size + j]][0] for j in range(0,size)])
last_Y = np.array([data[L[n//size + j]][1] for j in range(0,size)])
minibatches = [(np.array([data[L[i + j]][0] for j in range(0,size)]),np.array([data[L[i + j]][1] for j in range(0,size)])) for i in range(0,n-size,size)] + [(last_X,last_Y)]
return minibatches
def get_one():
tr_d = load_databis()
f = open("D:/Temp/Deeplearning/digit.csv",'w')
L = tr_d[0][0]
for i in range(0,28):
line = ""
for j in range(0,28):
line += str(L[j+28*i]) + ";"
line = line[:-1] + "\n"
f.write(line)
f.close()
def vectorized_result(j):
e = np.zeros(10)
e[j] = 1.0
return e
def load_model(filename):
f = open(filename, "r")
data = json.load(f)
f.close()
cost = getattr(sys.modules[__name__], data["cost"])
c = 1
L = []
while "layer" + str(c) in data:
data_l = data["layer" + str(c)]
size = data_l["size"]
layer = getattr(sys.modules[__name__], data_l["name"])(size[0],size[1],
activation = getattr(sys.modules[__name__], data_l["activation"]),
weights = np.array(data_l["weights"]),
biases = np.array(data_l["biases"]))
L.append(layer)
c+=1
model = Model(L,cost=cost)
return model
def add_dim_diag(X):
return np.array([np.diag(X[i,:]) for i in range(X.shape[0])])
def printProgressBar (iteration, total, prefix = '', suffix = '', length = 50, fill = '█'):
percent = "%i/%i" % (iteration,total)
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print(f"\r{prefix} {percent} |{bar}| {suffix}",end="")
if iteration >= total:
print()
def gen_rd_data(n_in,n_out,n_samp):
X = np.random.normal(size=(n_samp,n_in))
A = np.random.normal(size=(n_in,n_out))
noise = np.random.normal(size=(n_samp,n_out),scale=0.01)
Y = np.dot(X,A) + noise
Y = np.where(Y>0.,np.zeros(Y.shape),np.ones(Y.shape))
return [(X[i,:],Y[i,:]) for i in range(n_samp)]
def check_zeros(L1,L2):
L1 = [np.where(X==0.,np.ones(X.shape),np.zeros(X.shape)) for X in L1]
L2 = [np.where(X==0.,np.ones(X.shape),np.zeros(X.shape)) for X in L2]
L1 = [np.sum(X) for X in L1]
L2 = [np.sum(X) for X in L2]
if sum(L1) != 0.:
l=0
while L1[l]==0.:
l+=1
print("Zero in L1 layer " + str(l))
if sum(L2) != 0.:
l=0
while L1[2]!=0.:
l+=1
print("Zero in L2 layer " + str(l))
def clip(x,val):
y = np.where(np.abs(x) >= val,x,np.sign(x)*val)
z = np.where(x==0, val * np.ones(x.shape), np.zeros(x.shape))
return y+z
def extract_sub(x,idx):
L = [[i] for i in idx]
if len(x.shape) == 2:
return x[L,idx]
elif len(x.shape) == 3:
return x[:,L,idx]
def create_bag(n_in):
models = []
for i in range(n_in):
mod = Model([Dense(784,64,Relu),Dense(64,10,Softmax)], cost=CrossEntropy)
models.append(mod)
return models
def try_bag(models, n_dat,n_epochs,lr,trn_dat,val_dat):
for mod in models:
L = list(range(len(trn_dat)))
L=np.random.permutation(L)
trn_dat_b = [trn_dat[L[i]] for i in range(0,len(trn_dat)//n_dat)]
mod.fit(trn_dat_b,n_epochs,64,lr,"Adam",val_dat)
def eval_try_bag(models,tst_data,batch_size=64):
mbs = create_minibatches(tst_data,batch_size,False)
num_correct = 0
coste = 0.
for mb in mbs:
L = []
for mod in models:
L.append(mod.feed_forward(mb[0]))
z = np.stack(L)
preds = (z.mean(axis=0)).argmax(axis=1)
num_correct += np.equal(preds,mb[1].argmax(axis=1)).sum()
coste += (models[0].cost).fn(z,mb[1]) * z.shape[0]
return 100.* float(num_correct) /float(len(tst_data)) , coste / float(len(tst_data))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment