Skip to content

Instantly share code, notes, and snippets.

@Redchards
Last active October 9, 2018 08:31
Show Gist options
  • Save Redchards/05091817fca497c86d7dc54a48f9d9cc to your computer and use it in GitHub Desktop.
Save Redchards/05091817fca497c86d7dc54a48f9d9cc to your computer and use it in GitHub Desktop.
First lab of the Statistical Learning course at Sorbonne University
# -*- coding: utf-8 -*-
"""
Éditeur de Spyder
Ceci est un script temporaire.
"""
import torch
from torchvision import datasets, transforms
from math import sqrt
import pandas as pd
from random import randint
import matplotlib.pyplot as plt
import sklearn.linear_model
import numpy as np
from scipy.interpolate import UnivariateSpline
## une fois le dataset telecharge, mettre download=False !
## Pour le test, train = False
## transform permet de faire un preprocessing des donnees (ici ?)
batch_size=64
nb_digits=10
#train_loader = torch.utils.data.DataLoader(datasets.MNIST('../data', train=True, download=False, transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,),(0.3081,))])), batch_size=batch_size, shuffle=True)
#print(train_loader.dataset.train_data.size())
class Loss:
def forward(self, y, y_pred):
pass
def backward(self, y, y_pred):
pass
class MSE(Loss):
def forward(self, y, y_pred):
return (y - y_pred).norm() ** 2
def backward(self, y, y_pred):
return 2 * (y_pred - y)
class Hinge(Loss):
def forward(self, y, y_pred):
return torch.max(torch.zeros(y.shape), -y * y_pred)
def backward(self, y, y_pred):
t = y * y_pred
t[t >= 0] = 0
t[t < 0] = -y[t < 0]
return t
class Module:
def forward(self, x):
pass
def backward_update_gradient(self, x, delta):
pass
def update_parameters(self, epsilon):
pass
def backward_delta(self, x, delta):
pass
def zero_grad(self):
pass
def initialize_parameters(self):
pass
class Lineaire(Module):
def __init__(self, in_dim, out_dim):
self.in_dim = in_dim
self.out_dim = out_dim
self.w_tensor = torch.randn(in_dim + 1, out_dim)
self.zero_grad()
def forward(self, x):
return torch.cat((torch.ones((x.shape[0], 1)), x), 1).matmul(self.w_tensor)
def backward_update_gradient(self, x_data, delta):
print(x_data.shape)
print(delta.shape)
self.grad_list.append(torch.cat((torch.ones((1, 1)), x_data.view(1, self.in_dim)), 1).transpose(0, 1).matmul(delta))
def update_parameters(self, epsilon):
final_grad = sum(self.grad_list)
self.w_tensor -= epsilon * final_grad
def zero_grad(self):
self.grad_list = []
class Optimizer:
def optimize(self, module, x_train, y_train, epochs):
pass
class BatchGradientOptimizer(Optimizer):
def __init__(self, lr):
self.lr = lr
def optimize(self, module, loss, x_train, y_train, epochs = 1000):
err_list = []
for _ in range(epochs):
module.zero_grad()
y_pred = module.forward(x_train)
err = loss.forward(y_train, y_pred)
print(y_pred)
delta = loss.backward(y_train, y_pred)
print(delta)
for d, x in zip(delta, x_train):
module.backward_update_gradient(x_train, d)
model.update_parameters(self.lr)
err_list.append(err.mean())
return err_list
class MiniBatchGradientOptimizer(Optimizer):
def __init__(self, lr, batch_size = 64):
self.lr = lr
self.batch_size = batch_size
def optimize(self, module, loss, x_train, y_train, epochs = 1000):
err_list = []
for _ in range(epochs):
module.zero_grad()
idx = [np.random.randint(0, x_train.shape[0]) for _ in range(self.batch_size)]
x = x_train[idx]
y = y_train[idx]
y_pred = module.forward(x)
err = loss.forward(y, y_pred)
delta = loss.backward(y, y_pred)
for d, xs in zip(delta, x_train):
module.backward_update_gradient(xs, d)
model.update_parameters(self.lr)
err_list.append(err.mean())
return err_list
lel = Lineaire(3, 4)
print(lel.forward(torch.randn(5, 3)))
print(lel.forward(torch.Tensor([[1, 2, 3], [4, 5, 6]])))
print(lel.forward(torch.Tensor([[1, 2, 3]])))
data = pd.read_csv("housing.csv", sep=r'\s+')
data.columns = ["crime_rate", "residential_area", "industry", "river_bound", "nitric_oxides_concentration", "average_room", "old_homes",
"job", "highways", "education", "taxes", "black_pop", "lower_class", "median_home_value"]
print(data)
#x_train = data[["crime_rate", "industry", "education", "job", "average_room", "nitric_oxides_concentration"]]
x_train = data[["average_room"]]
y_train = data["median_home_value"]
model = Lineaire(1, 1)
loss = MSE()
epsilon = 0.00015
optimizer = MiniBatchGradientOptimizer(epsilon)
optimizer.optimize(model, loss, torch.Tensor(x_train.as_matrix()), torch.Tensor(y_train.as_matrix()))
'''err_list = []
for i in range(5000):
model.zero_grad()
err_sum = 0
for _ in range(128):
idx = randint(0, len(data.index) - 1)
x = torch.Tensor([x_train.loc[idx]])
y = torch.Tensor([y_train.loc[idx]])
y_pred = model.forward(x)
err = loss.forward(y, y_pred)
err_sum += err
delta = loss.backward(y, y_pred)
model.backward_update_gradient(x[0], delta)
model.update_parameters(epsilon)
print("Loss : ", err_sum / 128)
err_list.append(err_sum / 128)
print("y_pred = ", y_pred, ", y = ", y)'''
plt.figure()
err_list_spline = UnivariateSpline(range(len(err_list)), err_list, s = 1)
plt.plot(range(len(err_list)), err_list_spline(range(len(err_list))))
def linear_regression(X, y, m_current=0, b_current=0, epochs=20, learning_rate=0.0001):
N = float(len(y))
m_gradient = 0
b_gradient = 0
for i in range(epochs):
y_current = (m_current * X) + b_current
cost = sum([data**2 for data in (y-y_current)]) / N
m_gradient = -(2/N) * sum(X * (y - y_current))
b_gradient = -(2/N) * sum(y - y_current)
m_current = m_current - (learning_rate * m_gradient)
b_current = b_current - (learning_rate * b_gradient)
return m_current, b_current, cost
plt.figure()
print(model.w_tensor[0])
x_mat = x_train.as_matrix()
plt.scatter(x_mat, y_train.as_matrix())
plt.plot(range(int(x_mat.max())), [model.w_tensor[0] + model.w_tensor[1] * x for x in range(int(x_mat.max()))])
#reg = sklearn.linear_model.LinearRegression().fit(x_train.as_matrix(), y_train.as_matrix())
#plt.plot(range(int(x_mat.max())), [reg.intercept_ + reg.coef_[0] * x for x in range(int(x_mat.max()))])
#a, b, c = linear_regression(x_mat[0], y_train.as_matrix())
#print(a, b)
#plt.plot(range(int(x_mat.max())), [b + a * x for x in range(int(x_mat.max()))])
plt.show()
'''y_onehot = torch.FloatTensor(batch_size, nb_digits)
model = Lineaire(28 * 28, 10)
loss = Hinge()
epsilon = 0.01
loss_list = []
for i,(data,target) in enumerate(train_loader):
y_onehot.zero_()
y_onehot.scatter_(1, target.view(-1,1), 1)
model.zero_grad()
avg_err = 0
if target.shape[0] == 64:
batch = [elem.flatten() for elem in data]
y_pred = model.forward(batch)
#print("Pred : ", y_pred)
for i in range(target.shape[0]):
err = loss.forward(y_onehot[i], y_pred[i])
loss_list.append(err)
avg_err += err
delta = loss.backward(y_onehot[i], y_pred[i])
model.backward_update_gradient(batch[i], delta)
model.update_parameters(epsilon)
#print("Loss : ", (avg_err / data.shape[0])[0])
plt.figure()
plt.plot(range(len(loss_list)), loss_list)'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment