Skip to content

Instantly share code, notes, and snippets.

@Tony363

Tony363/SGD_sample.py Secret

Created Jul 16, 2021
Embed
What would you like to do?
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 7 20:33:26 2021
@author: pysolver33
"""
import numpy as np
from simulation_wrapper import *
def sgd(
gradient, x, y, n_vars=None, start=None, learn_rate=0.1,
decay_rate=0.0, batch_size=1, n_iter=50, tolerance=1e-06,
dtype="float64", random_state=None
):
# Checking if the gradient is callable
if not callable(gradient):
raise TypeError("'gradient' must be callable")
# Setting up the data type for NumPy arrays
dtype_ = np.dtype(dtype)
# Converting x and y to NumPy arrays
x, y = np.array(x, dtype=dtype_), np.array(y, dtype=dtype_)
n_obs = x.shape[0]
if n_obs != y.shape[0]:
raise ValueError("'x' and 'y' lengths do not match")
xy = np.c_[x.reshape(n_obs, -1), y.reshape(n_obs, 1)] # concatenates y in to betas at end
# Initializing the random number generator
seed = None if random_state is None else int(random_state)
rng = np.random.default_rng(seed=seed)
# Initializing the values of the variables
vector = (
rng.normal(size=int(n_vars)).astype(dtype_)
if start is None else
np.array(start, dtype=dtype_)
)
# Setting up and checking the learning rate
learn_rate = np.array(learn_rate, dtype=dtype_)
if np.any(learn_rate <= 0):
raise ValueError("'learn_rate' must be greater than zero")
# Setting up and checking the decay rate
decay_rate = np.array(decay_rate, dtype=dtype_)
if np.any(decay_rate < 0) or np.any(decay_rate > 1):
raise ValueError("'decay_rate' must be between zero and one")
# Setting up and checking the size of minibatches
batch_size = int(batch_size)
if not 0 < batch_size <= n_obs:
raise ValueError(
"'batch_size' must be greater than zero and less than "
"or equal to the number of observations"
)
# Setting up and checking the maximal number of iterations
n_iter = int(n_iter)
if n_iter <= 0:
raise ValueError("'n_iter' must be greater than zero")
# Setting up and checking the tolerance
tolerance = np.array(tolerance, dtype=dtype_)
if np.any(tolerance <= 0):
raise ValueError("'tolerance' must be greater than zero")
# Setting the difference to zero for the first iteration
diff = 0
# Performing the gradient descent loop
for _ in range(n_iter):
# Shuffle x and y
rng.shuffle(xy)
# Performing minibatch moves
for start in range(0, n_obs, batch_size):
stop = start + batch_size
x_batch, y_batch = xy[start:stop, :-1], xy[start:stop, -1:]
# Recalculating the difference
grad = np.array(gradient(x_batch, y_batch.flatten(), vector), dtype_)# .flatten() for back solving X instead of solving Y using our gradient
diff = decay_rate * diff - learn_rate * grad
# Checking if the absolute difference is small enough
if np.all(np.abs(diff) <= tolerance):
break
# Updating the values of the variables
vector += diff
return vector if vector.shape else vector.item()
def gradient_descent(
gradient, x, y, start, learn_rate=0.1, n_iter=50, tolerance=1e-06,
dtype="float64"
):
# Checking if the gradient is callable
if not callable(gradient):
raise TypeError("'gradient' must be callable")
# Setting up the data type for NumPy arrays
dtype_ = np.dtype(dtype)
# Converting x and y to NumPy arrays
x, y = np.array(x, dtype=dtype_), np.array(y, dtype=dtype_)
if x.shape[0] != y.shape[0]:
raise ValueError("'x' and 'y' lengths do not match")
# Initializing the values of the variables
vector = np.array(start, dtype=dtype_)
# Setting up and checking the learning rate
learn_rate = np.array(learn_rate, dtype=dtype_)
if np.any(learn_rate <= 0):
raise ValueError("'learn_rate' must be greater than zero")
# Setting up and checking the maximal number of iterations
n_iter = int(n_iter)
if n_iter <= 0:
raise ValueError("'n_iter' must be greater than zero")
# Setting up and checking the tolerance
tolerance = np.array(tolerance, dtype=dtype_)
if np.any(tolerance <= 0):
raise ValueError("'tolerance' must be greater than zero")
# Performing the gradient descent loop
for _ in range(n_iter):
# Recalculating the difference
diff = -learn_rate * np.array(gradient(x, y, vector), dtype_)
# Checking if the absolute difference is small enough
if np.all(np.abs(diff) <= tolerance):
break
# Updating the values of the variables
vector += diff
return vector if vector.shape else vector.item()
def dline(X):
"""generate design line"""
return np.asarray((
Decimal("1"),Decimal(f"{X[0]}"),Decimal(f"{X[1]}"),
Decimal(f"{2/3}")-Decimal(f"{X[0]**2}"),
Decimal(f"{2/3}")-Decimal(f"{X[1]**2}"),
Decimal(f"{X[0]}")*Decimal(f"{X[1]}")*Decimal(f"-1")
),dtype=object)
def gradient(B,Y,X):
"""calculate new_Y0 new_Y1"""
return np.asarray((
B[0].dot(step_4a2(X)) - Decimal(f"{Y[0]}"),
B[1].dot(step_4a2(X)) - Decimal(f"{Y[1]}")
),dtype=object)
if __name__ == "__main__":
sim = simulation(xi=2,seed=np.random.seed(200),status="standard",scale=0.0005)
sim.step_0()
sim.step_1()
sim.step_2()
sim.step_3()
sim.step_4()
sim_var = sim.variables()
Y = np.asarray((Decimal(f"{sim.new_ye0}"),Decimal(f"{sim.new_ye1}")))
B0 = np.asarray([Decimal(f'{beta}') for beta in sim.beta0_est])
B1 = np.asarray([Decimal(f'{beta}') for beta in sim.beta1_est])
print("testing")
gd_x = gradient_descent(gradient,np.asarray((B0,B1)),Y,(Decimal("1"),Decimal("1")),learn_rate=Decimal("0.001"),n_iter=100_000,dtype=object)
sgd_x = sgd(gradient,B,Y,n_vars=2,learn_rate=0.006,decay_rate=0.8, batch_size=2, n_iter=100_000, random_state=0)
print(f"GD for zero: {gradient(np.asarray((B0,B1)),Y,gd_x)}")
print(f"SGD for zero: {gradient(B,Y,sgd_x)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment