Skip to content

Instantly share code, notes, and snippets.

@ndamulelonemakh
Last active May 13, 2021 08:28
Show Gist options
  • Save ndamulelonemakh/446d872ea3e5032890efc6673f428ccb to your computer and use it in GitHub Desktop.
Save ndamulelonemakh/446d872ea3e5032890efc6673f428ccb to your computer and use it in GitHub Desktop.
Steepest Descent example implementation(s) in Python
"""Example implementation of the conjugate gradient descent algorithm using a hard coded objective function
F(X1, X2) = 5X1**2 + X2**2 + 4X1X2 - 14X1 - 6X2 + 20
- At each step the value of X will be updated using
X_k+1 = X_k + alpha * Pk
Where pk is the conjugate direction and alpha is the step length
"""
import math
import logging
import numpy as np
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
class ConjugateGradientDescent:
def __init__(self, max_iterations=10, hessian_matrix=None, linear_terms=None):
self.Hessian: np.ndarray = hessian_matrix or np.array([[10, 4], [4, 2]])
self.LinearCoefficients = linear_terms or np.array([-14, -6])
self.ConstantTerm = 20
self.Minimiser = np.array([0, 0])
self.CurrentIteration = 0
self.MaxIterations = max_iterations
self.Epsilon = math.pow(10, -3) # Stop if magnitude of gradient is less than this value
self._LastConjugate = None # Track gradient in current iteration - 1 step
@property
def __current_gradient_value(self):
gradient = self.del_f(self.Minimiser)
return np.sqrt(gradient.dot(gradient))
def f(self, xk: np.ndarray, decimal_places=3):
"""Returns the function value calcultaed from the equivalent quadratic form"""
squared_terms = 0.5 * xk.dot(self.Hessian).dot(xk)
linear_terms = self.LinearCoefficients.dot(xk)
total_cost = squared_terms + linear_terms + self.ConstantTerm
return round(total_cost, decimal_places)
def del_f(self, xk: np.ndarray) -> np.ndarray:
gradient_vector = np.matmul(self.Hessian, xk)
if isinstance(self.LinearCoefficients, np.ndarray):
gradient_vector = np.add(gradient_vector, self.LinearCoefficients)
return gradient_vector
def conjugate_direction(self, gk: np.ndarray):
if self.CurrentIteration == 0:
self._LastConjugate = -1 * gk
return self._LastConjugate
gk_previous: np.ndarray = self._LastConjugate
bk = gk.dot(gk) / gk_previous.dot(gk_previous)
pk = (-1 * gk) + bk*gk_previous
self._LastConjugate = pk # We will use this on next iteration as P_k-1
return pk
def exact_step_length(self, gk: np.ndarray, pk: np.ndarray, decimal_places=3) -> np.float64:
"""Get the step lengh by minimising f(x + ad_k) wrt to a
alpha(or lambda) = -gk * pk / pk * A * pk
"""
numerator = gk.dot(pk)
denominator = pk.dot(self.Hessian).dot(pk)
step_length = numerator/denominator
return round(step_length, decimal_places) * -1
def _find_next_candidate(self):
"""Get next potential minimum using X_k+1 = X_k + alpha * pk"""
xk = self.Minimiser
gk = self.del_f(xk)
pk = self.conjugate_direction(gk)
step_length = self.exact_step_length(gk, pk)
x_k_plus1 = xk + (step_length * pk)
log.debug(f"X**{self.CurrentIteration} = {xk} - {step_length} * {pk}")
return x_k_plus1
def execute(self):
log.info('Conjugate gradient descent iteration started')
for k in range(self.MaxIterations):
log.debug(f"-----------Iteration {k}--------------")
self.CurrentIteration = k
self.Minimiser = self._find_next_candidate()
log.debug(f"------Minimiser = {self.Minimiser}. Gradient = {self.__current_gradient_value}--------\n")
if self.__current_gradient_value <= self.Epsilon:
log.warning(f"Iteration stopped at k={k}. Stopping condition reached!")
break
log.info(f"------Minimiser = {self.Minimiser}. Gradient = {self.__current_gradient_value}--------")
log.info('Conjugate gradient descent completed successfully')
return self.Minimiser, self.f(self.Minimiser), round(self.__current_gradient_value, 3)
"""Naive implementation of the Coordinate Descent algorithm in Python
- The objective function is
F(X1, X2) = X1 - X2 + 2X1**2 + 2X1X2 + X2**2 [ofcourse this could passed as an argument, but i was too lazy..]
- The minimum value of F(x) is computed iteratively by minimizing individual coordinates in a cyclic manner
- On each iteration the active coordinate is updated as follows
X_k+1 = X_k - alpha * [del_F(xk)] * e
Where
* alpha : step length,
* e : the active co-ordinate vector(required to make vector addition to Xk possible),
* del_F(xk) : component of the gradient corresponding to the active coordinate
- This is a very simplistic(i.e.naive) implementation, but I belive it should be enough to give an idea of how the alorithm works
"""
import logging
import numpy as np
import math
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
__author__ = "@NdamuleloNemakh"
__version__ = 0.01
class CoordinateDescent:
def __init__(self, max_iterations=500, no_of_dimensions=2, step_length=0.5, fixed_step_length=True):
self.Dimensions = no_of_dimensions
self.StepLength = step_length
self.FixedStepLength = fixed_step_length
self.MaxIterations = max_iterations
self.CurrentIteration = 0
self.Minimiser: np.ndarray = np.array([0, 0])
self.ConvergenceThreshold = 10e-4
def __preview_config(self):
configs = f'''
Configuration Preview
========================
n = {self.Dimensions}
Step Length = {self.StepLength}
Fixed Step Length = {self.FixedStepLength}
Max Iterations = {self.MaxIterations}
'''
log.debug(configs)
return configs
@property
def _current_coordinate_idx(self):
# Choose next cordinate to optimize, in a CYCLIC manner
next_idx = lambda i: i % 2 + 1
return next_idx(self.CurrentIteration)
@property
def _ith_coordinate_vector(self):
v = np.zeros(self.Dimensions)
active_idx = self._current_coordinate_idx - 1
v[active_idx] = 1
return v
@staticmethod
def __current_gradient(xk) -> np.ndarray:
# Example: F(X1, X2) = X1 - X2 + 2X1**2 + 2X1X2 + X2**2
x1 = xk[0]
x2 = xk[1]
del_fx1 = 1 + 4 * x1 + 2 * x2
del_fx2 = 2 * x1 + 2 * x2 - 1
del_f = np.array([del_fx1, del_fx2])
return del_f
@staticmethod
def f(xk, decimal_places=3):
x1 = xk[0]
x2 = xk[1]
fn_value = x1 - x2 + 2 * math.pow(x1, 2) + 2 * x1 * x2 * math.pow(x2, 2)
return round(fn_value, decimal_places)
@property
def __current_gradient_value(self):
del_f = self.__current_gradient(self.Minimiser)
return np.sqrt(del_f.dot(del_f))
@property
def __current_coordinate_gradient(self):
del_f = self.__current_gradient(self.Minimiser)
return del_f[self._current_coordinate_idx - 1]
def __find_next_candidate(self):
"""Compute the next potential minimiser point, i.e. Xk+1"""
xk = self.Minimiser
del_f = self.__current_coordinate_gradient
xk_plus_1 = xk - self.StepLength * del_f * self._ith_coordinate_vector
log.debug(f"X**{self.CurrentIteration} = {xk} - {self.StepLength} * {del_f} * {self._ith_coordinate_vector}")
return xk_plus_1
def execute(self):
log.info('Coordinate descent iteration started')
self.__preview_config()
for k in range(self.MaxIterations):
log.debug(f"-----------Iteration {k}--------------")
self.CurrentIteration = k
self.Minimiser = self.__find_next_candidate()
log.debug(f"------Minimiser = {self.Minimiser}. Gradient = {self.__current_gradient(self.Minimiser)} or "
f"{self.__current_gradient_value}--------\n")
if self.__current_gradient_value <= self.ConvergenceThreshold:
log.warning('Convergence condition satisfied, STOP!')
break
log.info(f"------Minimiser = {self.Minimiser}. Gradient = {self.__current_gradient_value}--------")
log.info(f'Coordinate descent completed successfully. Total Iterations={self.CurrentIteration}')
return self.Minimiser, self.f(self.Minimiser), round(self.__current_gradient_value, 3)
def main():
optimizer = CoordinateDescent()
minimiser, min_value, gradient = optimizer.execute()
result = f'''
=====Function F(X1, X2) has a local minimum at {minimiser}=========
- Min Value = {min_value}
- Slope = {gradient}
'''
log.info(result.strip())
if __name__ == '__main__':
main()
"""Example implementation of the steepest descent algorithm using a hard coded obbjective function
F(X1, X2) = 5X1**2 + X2**2 + 4X1X2 - 14X1 - 6X2 + 20
- At each step the value of X will be updated using
X_k+1 = X_k + alpha * del_F(Xk)
- Initial guess of the minimiser is a point X_0 = [0, 0]
- The value of alpha will be calculated using an exact line search
alpha = || gk || ** 2 / gk_T * A * gk
where gk is the gradient vector at point Xk and A is the hessian matrix
"""
import math
import logging
import numpy as np
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
class GradientDescent:
def __init__(self, max_iterations=100, hessian_matrix=None, linear_terms=None):
self.Hessian: np.ndarray = hessian_matrix or np.array([[10, 4], [4, 2]])
self.LinearCoefficients = linear_terms or np.array([-14, -6])
self.Minimiser = np.array([0, 0])
self.CurrentIteration = 0
self.MaxIterations = max_iterations
self.Epsilon = 0.0001 # Stop if magnitude of gradient is less than this value
@property
def __current_gradient_value(self):
gradient = self.del_f(self.Minimiser)
return np.sqrt(gradient.dot(gradient))
@staticmethod
def f(xk, decimal_places=3):
x1 = xk[0]
x2 = xk[1]
fn_value = 5*math.pow(x1, 2) + math.pow(x2, 2) + 4*x1*x2 - 14*x1 - 6*x2 + 20
return round(fn_value, decimal_places)
def del_f(self, xk: np.ndarray) -> np.ndarray:
gradient_vector = np.matmul(self.Hessian, xk)
if isinstance(self.LinearCoefficients, np.ndarray):
gradient_vector = np.add(gradient_vector, self.LinearCoefficients)
return gradient_vector
def exact_step_length(self, xk: np.ndarray, decimal_places=3) -> np.float64:
"""Get the step lengh by minimising f(x + ad_k) wrt to a
alpha(or lambda) = || gk || ** 2 / gk_T * A * gk
"""
gk = self.del_f(xk)
numerator = gk.dot(gk)
denominator = gk.dot(self.Hessian).dot(gk)
step_len = numerator/denominator
return round(step_len, decimal_places)
def _find_next_candidate(self):
"""Get next potential minimum using X_k+1 = X_k + alpha * del_F(Xk)"""
xk = self.Minimiser
dk = -1 * self.del_f(xk)
step_length = self.exact_step_length(xk)
x_k_plus1 = xk + (step_length * dk)
log.debug(f"X**{self.CurrentIteration} = {xk} - {step_length} * {dk}")
return x_k_plus1
def execute(self):
log.info('Gradient descent iteration started')
# self.__preview_config()
for k in range(self.MaxIterations):
log.debug(f"-----------Iteration {k}--------------")
self.CurrentIteration = k
self.Minimiser = self._find_next_candidate()
log.debug(f"------Minimiser = {self.Minimiser}. Gradient = {self.__current_gradient_value}--------\n")
if self.__current_gradient_value <= self.Epsilon:
log.warning(f"Iteration stopped at k={k}. Stopping condition reached!")
break
log.info(f"------Minimiser = {self.Minimiser}. Gradient = {self.__current_gradient_value}--------")
log.info('Gradient descent completed successfully')
return self.Minimiser, self.f(self.Minimiser), round(self.__current_gradient_value, 3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment