-
-
Save Tony363/e78b802089dec5c498d642450c0678c3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Created on Wed Jul 7 20:33:26 2021 | |
@author: pysolver33 | |
""" | |
import numpy as np | |
from simulation_wrapper import * | |
def sgd( | |
gradient, x, y, n_vars=None, start=None, learn_rate=0.1, | |
decay_rate=0.0, batch_size=1, n_iter=50, tolerance=1e-06, | |
dtype="float64", random_state=None | |
): | |
# Checking if the gradient is callable | |
if not callable(gradient): | |
raise TypeError("'gradient' must be callable") | |
# Setting up the data type for NumPy arrays | |
dtype_ = np.dtype(dtype) | |
# Converting x and y to NumPy arrays | |
x, y = np.array(x, dtype=dtype_), np.array(y, dtype=dtype_) | |
n_obs = x.shape[0] | |
if n_obs != y.shape[0]: | |
raise ValueError("'x' and 'y' lengths do not match") | |
xy = np.c_[x.reshape(n_obs, -1), y.reshape(n_obs, 1)] # concatenates y in to betas at end | |
# Initializing the random number generator | |
seed = None if random_state is None else int(random_state) | |
rng = np.random.default_rng(seed=seed) | |
# Initializing the values of the variables | |
vector = ( | |
rng.normal(size=int(n_vars)).astype(dtype_) | |
if start is None else | |
np.array(start, dtype=dtype_) | |
) | |
# Setting up and checking the learning rate | |
learn_rate = np.array(learn_rate, dtype=dtype_) | |
if np.any(learn_rate <= 0): | |
raise ValueError("'learn_rate' must be greater than zero") | |
# Setting up and checking the decay rate | |
decay_rate = np.array(decay_rate, dtype=dtype_) | |
if np.any(decay_rate < 0) or np.any(decay_rate > 1): | |
raise ValueError("'decay_rate' must be between zero and one") | |
# Setting up and checking the size of minibatches | |
batch_size = int(batch_size) | |
if not 0 < batch_size <= n_obs: | |
raise ValueError( | |
"'batch_size' must be greater than zero and less than " | |
"or equal to the number of observations" | |
) | |
# Setting up and checking the maximal number of iterations | |
n_iter = int(n_iter) | |
if n_iter <= 0: | |
raise ValueError("'n_iter' must be greater than zero") | |
# Setting up and checking the tolerance | |
tolerance = np.array(tolerance, dtype=dtype_) | |
if np.any(tolerance <= 0): | |
raise ValueError("'tolerance' must be greater than zero") | |
# Setting the difference to zero for the first iteration | |
diff = 0 | |
# Performing the gradient descent loop | |
for _ in range(n_iter): | |
# Shuffle x and y | |
rng.shuffle(xy) | |
# Performing minibatch moves | |
for start in range(0, n_obs, batch_size): | |
stop = start + batch_size | |
x_batch, y_batch = xy[start:stop, :-1], xy[start:stop, -1:] | |
# Recalculating the difference | |
grad = np.array(gradient(x_batch, y_batch.flatten(), vector), dtype_)# .flatten() for back solving X instead of solving Y using our gradient | |
diff = decay_rate * diff - learn_rate * grad | |
# Checking if the absolute difference is small enough | |
if np.all(np.abs(diff) <= tolerance): | |
break | |
# Updating the values of the variables | |
vector += diff | |
return vector if vector.shape else vector.item() | |
def gradient_descent( | |
gradient, x, y, start, learn_rate=0.1, n_iter=50, tolerance=1e-06, | |
dtype="float64" | |
): | |
# Checking if the gradient is callable | |
if not callable(gradient): | |
raise TypeError("'gradient' must be callable") | |
# Setting up the data type for NumPy arrays | |
dtype_ = np.dtype(dtype) | |
# Converting x and y to NumPy arrays | |
x, y = np.array(x, dtype=dtype_), np.array(y, dtype=dtype_) | |
if x.shape[0] != y.shape[0]: | |
raise ValueError("'x' and 'y' lengths do not match") | |
# Initializing the values of the variables | |
vector = np.array(start, dtype=dtype_) | |
# Setting up and checking the learning rate | |
learn_rate = np.array(learn_rate, dtype=dtype_) | |
if np.any(learn_rate <= 0): | |
raise ValueError("'learn_rate' must be greater than zero") | |
# Setting up and checking the maximal number of iterations | |
n_iter = int(n_iter) | |
if n_iter <= 0: | |
raise ValueError("'n_iter' must be greater than zero") | |
# Setting up and checking the tolerance | |
tolerance = np.array(tolerance, dtype=dtype_) | |
if np.any(tolerance <= 0): | |
raise ValueError("'tolerance' must be greater than zero") | |
# Performing the gradient descent loop | |
for _ in range(n_iter): | |
# Recalculating the difference | |
diff = -learn_rate * np.array(gradient(x, y, vector), dtype_) | |
# Checking if the absolute difference is small enough | |
if np.all(np.abs(diff) <= tolerance): | |
break | |
# Updating the values of the variables | |
vector += diff | |
return vector if vector.shape else vector.item() | |
def dline(X): | |
"""generate design line""" | |
return np.asarray(( | |
Decimal("1"),Decimal(f"{X[0]}"),Decimal(f"{X[1]}"), | |
Decimal(f"{2/3}")-Decimal(f"{X[0]**2}"), | |
Decimal(f"{2/3}")-Decimal(f"{X[1]**2}"), | |
Decimal(f"{X[0]}")*Decimal(f"{X[1]}")*Decimal(f"-1") | |
),dtype=object) | |
def gradient(B,Y,X): | |
"""calculate new_Y0 new_Y1""" | |
return np.asarray(( | |
B[0].dot(step_4a2(X)) - Decimal(f"{Y[0]}"), | |
B[1].dot(step_4a2(X)) - Decimal(f"{Y[1]}") | |
),dtype=object) | |
if __name__ == "__main__": | |
sim = simulation(xi=2,seed=np.random.seed(200),status="standard",scale=0.0005) | |
sim.step_0() | |
sim.step_1() | |
sim.step_2() | |
sim.step_3() | |
sim.step_4() | |
sim_var = sim.variables() | |
Y = np.asarray((Decimal(f"{sim.new_ye0}"),Decimal(f"{sim.new_ye1}"))) | |
B0 = np.asarray([Decimal(f'{beta}') for beta in sim.beta0_est]) | |
B1 = np.asarray([Decimal(f'{beta}') for beta in sim.beta1_est]) | |
print("testing") | |
gd_x = gradient_descent(gradient,np.asarray((B0,B1)),Y,(Decimal("1"),Decimal("1")),learn_rate=Decimal("0.001"),n_iter=100_000,dtype=object) | |
sgd_x = sgd(gradient,B,Y,n_vars=2,learn_rate=0.006,decay_rate=0.8, batch_size=2, n_iter=100_000, random_state=0) | |
print(f"GD for zero: {gradient(np.asarray((B0,B1)),Y,gd_x)}") | |
print(f"SGD for zero: {gradient(B,Y,sgd_x)}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment