Skip to content

Instantly share code, notes, and snippets.

@hunan-rostomyan
Created November 14, 2024 21:40
Show Gist options
  • Select an option

  • Save hunan-rostomyan/0aa5200cdf7f80b2d74f8eee1a36b766 to your computer and use it in GitHub Desktop.

Select an option

Save hunan-rostomyan/0aa5200cdf7f80b2d74f8eee1a36b766 to your computer and use it in GitHub Desktop.
Automatic Differentiation from scratch
from contextlib import contextmanager
import numpy as np
import ops
from graph import Graph
# -------
# Globals
# -------
env = [{}]
graph = [Graph(), {}]
last_name_index = [0] # used to generate unique names for nodes
@contextmanager
def Session():
# save
_graph = graph[0]
_env = env[0]
_last_name_index = last_name_index[0]
# create
graph[0] = Graph()
env[0] = {}
last_name_index[0] = 0
yield env[0], graph[0]
# restore
graph[0] = _graph
env[0] = _env
last_name_index[0] = _last_name_index
# -------
# Helpers
# -------
def namegen(prefix='n'):
"""Generate a unique name of form 'n_{`last_name_index[0]`}' and
increments `last_name_index[0]`.
Example:
>>> namegen()
n_0
>>> namegen()
n_1
"""
name = last_name_index[0]
last_name_index[0] += 1
return f'{prefix}_{name}'
def to_numpy(value, dtype=np.float, preserve_scalars=True):
if isinstance(value, np.ndarray):
return value
if not isinstance(value, (list,)):
return value
return np.array(value)
def L(name, G):
return [n for n in G.nodes if n.name == name][0]
def is_leaf(node):
op = node.op
return isinstance(op, ops.Placeholder) or isinstance(op, ops.Immediate)
def show_env(E):
print('\n' + ('-' * 11))
print('Environment')
print('-' * 11)
for k, v in E.items():
print(f'{k}: {v}')
# -------------------
# Represent the graph
# -------------------
class Node:
def __init__(self, name, op, inputs=[], output=None, value=None):
# generate name and initialize
self.name = name
self.op = op
self.inputs = inputs
self.output = output
self.value = value
# add to graph
for _input in inputs:
graph[0].add_edge(_input, output, label='')
def __repr__(self):
return 'Node[name={}, op={}]'.format(self.name, self.op.__class__.__name__)
def __str__(self):
return self.__repr__()
def __len__(self):
return len(self.value)
def __add__(self, other):
name = namegen('+')
node = Node(name=name, op=ops.Add(), inputs=[self.name, other.name], output=name)
graph[0].add(node)
return node
def __sub__(self, other):
name = namegen('-')
node = Node(name=name, op=ops.Sub(), inputs=[self.name, other.name], output=name)
graph[0].add(node)
return node
def __mul__(self, other):
name = namegen('*')
node = Node(name=name, op=ops.Mult(), inputs=[self.name, other.name], output=name)
graph[0].add(node)
return node
def __matmul__(self, other):
name = namegen('@')
node = Node(name=name, op=ops.Matmul(), inputs=[self.name, other.name], output=name)
graph[0].add(node)
return node
def __rmatmul__(self, other):
return self @ other
def __pow__(self, other):
name = namegen('^')
node = Node(name=name, op=ops.Pow(), inputs=[self.name, other.name], output=name)
graph[0].add(node)
return node
# --------------
# Node factories
# --------------
def Variable(name, value):
value = to_numpy(value)
# E['x'] = array([1, 2, 3])
env[0][name] = value
node = Node(name=name, op=ops.Placeholder(), inputs=[], output=None, value=value)
graph[0].add(node)
return node
def Constant(value):
value = to_numpy(value)
node = Node(name=namegen('c'), op=ops.Immediate(), output=None, value=value)
graph[0].add(node)
return node
# ------------------
# Evaluate the graph
# ------------------
def forward(_nodes):
# We assume `nodes` is a list of `Node`s, so if it's not,
# we shove it into a list manually.
if not isinstance(_nodes, (list,)):
atomic = True
nodes = [_nodes]
else:
atomic = False
nodes = _nodes
E, G = env[0], graph[0]
# Make a forward pass to compute all values.
for node in G.nodes:
# Variable
if isinstance(node.op, ops.Placeholder):
pass
# Constant
elif isinstance(node.op, ops.Immediate):
# The value is the output, so add it to the environment.
E[node.name] = node.value
# Compound
else:
# If the output (name) is in the environment, then this node
# already has its value, so our work is done.
if node.output in E:
continue
# During the forward pass, by the time we reach a node, we can be
# sure that its inputs have been evaluated. To evaluate a compound
# expression, we lookup all of its input values and pass them to
# the op.
E[node.output] = node.op(*[E[_input] for _input in node.inputs])
# Lookup the values of the requested nodes.
values = [E[node.name] for node in nodes]
return values if not atomic else values[0]
# ---------------------
# Compute the gradients
# ---------------------
def backward(node, _wrt):
if not isinstance(_wrt, (list,)):
atomic = True
wrt = [_wrt]
else:
atomic = False
wrt = _wrt
E, G, S = env[0], graph[0], graph[1]
def aux(node, signal):
if is_leaf(node): return
op = node.op
parent_names = node.inputs
parent_nodes = [L(n, G) for n in parent_names]
parent_values = [E[n] for n in parent_names]
for grad, parent_node, parent_name in zip(op.grad, parent_nodes, parent_names):
new_signal = grad(signal, *parent_values)
S[parent_name] = new_signal
aux(parent_node, new_signal)
aux(node, 1)
grads = [S[node.name] for node in wrt]
return grads if not grads else grads[0]
# ----------
# Optimizers
# ----------
class GradientDescentOptimizer:
def __init__(self, alpha):
self.alpha = alpha
def step(self, node, grad):
env[0][node.name] -= self.alpha * grad
import numpy as np
import matplotlib.pyplot as plt
import autodiff as ad
def model(X, W):
return X @ W
def loss(Y, Y_hat):
return ((Y - Y_hat) ** ad.Constant(2)) * (ad.Constant(1 / 2))
def compute_cost(X, W):
Y_hat = model(X, W)
shape = ad.forward(Y_hat).shape
ones = ad.Variable('ones', np.ones(shape).T * (1 / shape[0]))
diff = loss(Y, Y_hat)
return ones @ diff
with ad.Session():
# --------------
# Synthetic Data
# --------------
X = ad.Variable('X', np.array([
[ 1., 1.],
[ 1., 2.],
[ 1., 3.],
[ 1., 4.],
[ 1., 5.],
[ 1., 6.],
[ 1., 7.],
[ 1., 8.],
[ 1., 9.],
[ 1., 10.]
]))
noise = np.random.normal(0, 0.5, len(X))
Y = ad.Variable('Y', (X.value[:, 1] + noise).reshape(-1, 1))
# ---------
# Parameter
# ---------
W = ad.Variable('W', np.zeros((2, 1)))
# ---------------
# Hyperparameters
# ---------------
alpha, eta = 0.01, 10
# ---------
# Optimizer
# ---------
optimizer = ad.GradientDescentOptimizer(alpha=alpha)
# --------
# Training
# --------
losses = []
for _ in range(eta):
# compute the cost
cost = compute_cost(X, W)
cost_val = ad.forward(cost).item()
losses.append(cost_val)
print('cost: ', cost_val)
# backprop
w_grad = ad.backward(cost, [W])
# optimize
optimizer.step(W, w_grad)
# ---------
# Visualize
# ---------
# learning curve
plt.title('Cost ($\mathcal{J}$) over epoch')
plt.plot(range(1, len(losses) + 1), losses, 'r')
plt.xticks(range(1, len(losses) + 1))
plt.xlabel('epoch'); plt.ylabel('$\mathcal{J}$')
plt.show()
# prediction vs target
Y_hat_final = model(X, W)
Y_hat_final_val = ad.forward(Y_hat_final)
Y_val = ad.forward(Y)
plt.title('Prediction ($\hat{Y}$) vs Target ($Y$) ')
plt.plot(ad.forward(X)[:, 1], Y_hat_final_val[:, 0], label='$\hat{Y}$')
plt.scatter(ad.forward(X)[:, 1], Y_val[:, 0], label='$Y$')
plt.xlabel('X'); plt.ylabel('Y')
plt.legend()
plt.show()
from graphviz import Digraph
class Graph:
def __init__(self):
self.nodes = []
self.vis = Digraph(format='png')
self.vis.attr(rankdir='LR')
self.render()
def add(self, node):
self.nodes.append(node)
self.render()
def add_edge(self, fro, to, label=''):
self.vis.edge(fro, to, label)
def render(self):
self.vis.render()
def __str__(self):
return self.__repr__()
def __repr__(self):
return 'Graph[size={}]'.format(len(self.nodes))
import numpy as np
class Placeholder: pass
class Immediate: pass
def to_numpy(value, dtype=np.float, preserve_scalars=True):
if isinstance(value, np.ndarray):
return value
if not isinstance(value, (list,)):
return value
return np.array(value)
def is_scalar(x):
if isinstance(x, (list,)):
return False
if isinstance(x, np.ndarray) and x.ndim != 0:
return False
return True
class Add:
def __call__(self, x, y):
return x + y
@property
def grad(self):
return [
lambda signal, x, y: signal,
lambda signal, x, y: signal
]
class Sub:
def __call__(self, x, y):
return x - y
@property
def grad(self):
return [
lambda signal, x, y: signal,
lambda signal, x, y: -signal
]
class Mult:
def __call__(self, x, y):
if is_scalar(x) and is_scalar(y):
return x * y
else:
return (np.array(x) * np.array(y)).tolist()
@property
def grad(self):
return [
lambda signal, x, y: signal * y,
lambda signal, x, y: signal * x
]
class Pow:
def __call__(self, x, c):
if is_scalar(x):
return x ** c
else:
return np.power(x, c)
@property
def grad(self):
return [lambda signal, x, c: signal * c * x]
class Matmul:
def __call__(self, xs, ys):
return np.dot(np.array(xs), np.array(ys))
@property
def grad(self):
def xs_grad(signal, xs, ys):
xs, ys = to_numpy(xs), to_numpy(ys)
return np.dot(signal, ys.T)
def ys_grad(signal, xs, ys):
xs, ys = to_numpy(xs), to_numpy(ys)
return np.dot(xs.T, signal)
return [xs_grad, ys_grad]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment