-
-
Save hunan-rostomyan/0aa5200cdf7f80b2d74f8eee1a36b766 to your computer and use it in GitHub Desktop.
Automatic Differentiation from scratch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from contextlib import contextmanager | |
| import numpy as np | |
| import ops | |
| from graph import Graph | |
| # ------- | |
| # Globals | |
| # ------- | |
| env = [{}] | |
| graph = [Graph(), {}] | |
| last_name_index = [0] # used to generate unique names for nodes | |
| @contextmanager | |
| def Session(): | |
| # save | |
| _graph = graph[0] | |
| _env = env[0] | |
| _last_name_index = last_name_index[0] | |
| # create | |
| graph[0] = Graph() | |
| env[0] = {} | |
| last_name_index[0] = 0 | |
| yield env[0], graph[0] | |
| # restore | |
| graph[0] = _graph | |
| env[0] = _env | |
| last_name_index[0] = _last_name_index | |
| # ------- | |
| # Helpers | |
| # ------- | |
| def namegen(prefix='n'): | |
| """Generate a unique name of form 'n_{`last_name_index[0]`}' and | |
| increments `last_name_index[0]`. | |
| Example: | |
| >>> namegen() | |
| n_0 | |
| >>> namegen() | |
| n_1 | |
| """ | |
| name = last_name_index[0] | |
| last_name_index[0] += 1 | |
| return f'{prefix}_{name}' | |
| def to_numpy(value, dtype=np.float, preserve_scalars=True): | |
| if isinstance(value, np.ndarray): | |
| return value | |
| if not isinstance(value, (list,)): | |
| return value | |
| return np.array(value) | |
| def L(name, G): | |
| return [n for n in G.nodes if n.name == name][0] | |
| def is_leaf(node): | |
| op = node.op | |
| return isinstance(op, ops.Placeholder) or isinstance(op, ops.Immediate) | |
| def show_env(E): | |
| print('\n' + ('-' * 11)) | |
| print('Environment') | |
| print('-' * 11) | |
| for k, v in E.items(): | |
| print(f'{k}: {v}') | |
| # ------------------- | |
| # Represent the graph | |
| # ------------------- | |
| class Node: | |
| def __init__(self, name, op, inputs=[], output=None, value=None): | |
| # generate name and initialize | |
| self.name = name | |
| self.op = op | |
| self.inputs = inputs | |
| self.output = output | |
| self.value = value | |
| # add to graph | |
| for _input in inputs: | |
| graph[0].add_edge(_input, output, label='') | |
| def __repr__(self): | |
| return 'Node[name={}, op={}]'.format(self.name, self.op.__class__.__name__) | |
| def __str__(self): | |
| return self.__repr__() | |
| def __len__(self): | |
| return len(self.value) | |
| def __add__(self, other): | |
| name = namegen('+') | |
| node = Node(name=name, op=ops.Add(), inputs=[self.name, other.name], output=name) | |
| graph[0].add(node) | |
| return node | |
| def __sub__(self, other): | |
| name = namegen('-') | |
| node = Node(name=name, op=ops.Sub(), inputs=[self.name, other.name], output=name) | |
| graph[0].add(node) | |
| return node | |
| def __mul__(self, other): | |
| name = namegen('*') | |
| node = Node(name=name, op=ops.Mult(), inputs=[self.name, other.name], output=name) | |
| graph[0].add(node) | |
| return node | |
| def __matmul__(self, other): | |
| name = namegen('@') | |
| node = Node(name=name, op=ops.Matmul(), inputs=[self.name, other.name], output=name) | |
| graph[0].add(node) | |
| return node | |
| def __rmatmul__(self, other): | |
| return self @ other | |
| def __pow__(self, other): | |
| name = namegen('^') | |
| node = Node(name=name, op=ops.Pow(), inputs=[self.name, other.name], output=name) | |
| graph[0].add(node) | |
| return node | |
| # -------------- | |
| # Node factories | |
| # -------------- | |
| def Variable(name, value): | |
| value = to_numpy(value) | |
| # E['x'] = array([1, 2, 3]) | |
| env[0][name] = value | |
| node = Node(name=name, op=ops.Placeholder(), inputs=[], output=None, value=value) | |
| graph[0].add(node) | |
| return node | |
| def Constant(value): | |
| value = to_numpy(value) | |
| node = Node(name=namegen('c'), op=ops.Immediate(), output=None, value=value) | |
| graph[0].add(node) | |
| return node | |
| # ------------------ | |
| # Evaluate the graph | |
| # ------------------ | |
| def forward(_nodes): | |
| # We assume `nodes` is a list of `Node`s, so if it's not, | |
| # we shove it into a list manually. | |
| if not isinstance(_nodes, (list,)): | |
| atomic = True | |
| nodes = [_nodes] | |
| else: | |
| atomic = False | |
| nodes = _nodes | |
| E, G = env[0], graph[0] | |
| # Make a forward pass to compute all values. | |
| for node in G.nodes: | |
| # Variable | |
| if isinstance(node.op, ops.Placeholder): | |
| pass | |
| # Constant | |
| elif isinstance(node.op, ops.Immediate): | |
| # The value is the output, so add it to the environment. | |
| E[node.name] = node.value | |
| # Compound | |
| else: | |
| # If the output (name) is in the environment, then this node | |
| # already has its value, so our work is done. | |
| if node.output in E: | |
| continue | |
| # During the forward pass, by the time we reach a node, we can be | |
| # sure that its inputs have been evaluated. To evaluate a compound | |
| # expression, we lookup all of its input values and pass them to | |
| # the op. | |
| E[node.output] = node.op(*[E[_input] for _input in node.inputs]) | |
| # Lookup the values of the requested nodes. | |
| values = [E[node.name] for node in nodes] | |
| return values if not atomic else values[0] | |
| # --------------------- | |
| # Compute the gradients | |
| # --------------------- | |
| def backward(node, _wrt): | |
| if not isinstance(_wrt, (list,)): | |
| atomic = True | |
| wrt = [_wrt] | |
| else: | |
| atomic = False | |
| wrt = _wrt | |
| E, G, S = env[0], graph[0], graph[1] | |
| def aux(node, signal): | |
| if is_leaf(node): return | |
| op = node.op | |
| parent_names = node.inputs | |
| parent_nodes = [L(n, G) for n in parent_names] | |
| parent_values = [E[n] for n in parent_names] | |
| for grad, parent_node, parent_name in zip(op.grad, parent_nodes, parent_names): | |
| new_signal = grad(signal, *parent_values) | |
| S[parent_name] = new_signal | |
| aux(parent_node, new_signal) | |
| aux(node, 1) | |
| grads = [S[node.name] for node in wrt] | |
| return grads if not grads else grads[0] | |
| # ---------- | |
| # Optimizers | |
| # ---------- | |
| class GradientDescentOptimizer: | |
| def __init__(self, alpha): | |
| self.alpha = alpha | |
| def step(self, node, grad): | |
| env[0][node.name] -= self.alpha * grad |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import autodiff as ad | |
| def model(X, W): | |
| return X @ W | |
| def loss(Y, Y_hat): | |
| return ((Y - Y_hat) ** ad.Constant(2)) * (ad.Constant(1 / 2)) | |
| def compute_cost(X, W): | |
| Y_hat = model(X, W) | |
| shape = ad.forward(Y_hat).shape | |
| ones = ad.Variable('ones', np.ones(shape).T * (1 / shape[0])) | |
| diff = loss(Y, Y_hat) | |
| return ones @ diff | |
| with ad.Session(): | |
| # -------------- | |
| # Synthetic Data | |
| # -------------- | |
| X = ad.Variable('X', np.array([ | |
| [ 1., 1.], | |
| [ 1., 2.], | |
| [ 1., 3.], | |
| [ 1., 4.], | |
| [ 1., 5.], | |
| [ 1., 6.], | |
| [ 1., 7.], | |
| [ 1., 8.], | |
| [ 1., 9.], | |
| [ 1., 10.] | |
| ])) | |
| noise = np.random.normal(0, 0.5, len(X)) | |
| Y = ad.Variable('Y', (X.value[:, 1] + noise).reshape(-1, 1)) | |
| # --------- | |
| # Parameter | |
| # --------- | |
| W = ad.Variable('W', np.zeros((2, 1))) | |
| # --------------- | |
| # Hyperparameters | |
| # --------------- | |
| alpha, eta = 0.01, 10 | |
| # --------- | |
| # Optimizer | |
| # --------- | |
| optimizer = ad.GradientDescentOptimizer(alpha=alpha) | |
| # -------- | |
| # Training | |
| # -------- | |
| losses = [] | |
| for _ in range(eta): | |
| # compute the cost | |
| cost = compute_cost(X, W) | |
| cost_val = ad.forward(cost).item() | |
| losses.append(cost_val) | |
| print('cost: ', cost_val) | |
| # backprop | |
| w_grad = ad.backward(cost, [W]) | |
| # optimize | |
| optimizer.step(W, w_grad) | |
| # --------- | |
| # Visualize | |
| # --------- | |
| # learning curve | |
| plt.title('Cost ($\mathcal{J}$) over epoch') | |
| plt.plot(range(1, len(losses) + 1), losses, 'r') | |
| plt.xticks(range(1, len(losses) + 1)) | |
| plt.xlabel('epoch'); plt.ylabel('$\mathcal{J}$') | |
| plt.show() | |
| # prediction vs target | |
| Y_hat_final = model(X, W) | |
| Y_hat_final_val = ad.forward(Y_hat_final) | |
| Y_val = ad.forward(Y) | |
| plt.title('Prediction ($\hat{Y}$) vs Target ($Y$) ') | |
| plt.plot(ad.forward(X)[:, 1], Y_hat_final_val[:, 0], label='$\hat{Y}$') | |
| plt.scatter(ad.forward(X)[:, 1], Y_val[:, 0], label='$Y$') | |
| plt.xlabel('X'); plt.ylabel('Y') | |
| plt.legend() | |
| plt.show() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from graphviz import Digraph | |
| class Graph: | |
| def __init__(self): | |
| self.nodes = [] | |
| self.vis = Digraph(format='png') | |
| self.vis.attr(rankdir='LR') | |
| self.render() | |
| def add(self, node): | |
| self.nodes.append(node) | |
| self.render() | |
| def add_edge(self, fro, to, label=''): | |
| self.vis.edge(fro, to, label) | |
| def render(self): | |
| self.vis.render() | |
| def __str__(self): | |
| return self.__repr__() | |
| def __repr__(self): | |
| return 'Graph[size={}]'.format(len(self.nodes)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| class Placeholder: pass | |
| class Immediate: pass | |
| def to_numpy(value, dtype=np.float, preserve_scalars=True): | |
| if isinstance(value, np.ndarray): | |
| return value | |
| if not isinstance(value, (list,)): | |
| return value | |
| return np.array(value) | |
| def is_scalar(x): | |
| if isinstance(x, (list,)): | |
| return False | |
| if isinstance(x, np.ndarray) and x.ndim != 0: | |
| return False | |
| return True | |
| class Add: | |
| def __call__(self, x, y): | |
| return x + y | |
| @property | |
| def grad(self): | |
| return [ | |
| lambda signal, x, y: signal, | |
| lambda signal, x, y: signal | |
| ] | |
| class Sub: | |
| def __call__(self, x, y): | |
| return x - y | |
| @property | |
| def grad(self): | |
| return [ | |
| lambda signal, x, y: signal, | |
| lambda signal, x, y: -signal | |
| ] | |
| class Mult: | |
| def __call__(self, x, y): | |
| if is_scalar(x) and is_scalar(y): | |
| return x * y | |
| else: | |
| return (np.array(x) * np.array(y)).tolist() | |
| @property | |
| def grad(self): | |
| return [ | |
| lambda signal, x, y: signal * y, | |
| lambda signal, x, y: signal * x | |
| ] | |
| class Pow: | |
| def __call__(self, x, c): | |
| if is_scalar(x): | |
| return x ** c | |
| else: | |
| return np.power(x, c) | |
| @property | |
| def grad(self): | |
| return [lambda signal, x, c: signal * c * x] | |
| class Matmul: | |
| def __call__(self, xs, ys): | |
| return np.dot(np.array(xs), np.array(ys)) | |
| @property | |
| def grad(self): | |
| def xs_grad(signal, xs, ys): | |
| xs, ys = to_numpy(xs), to_numpy(ys) | |
| return np.dot(signal, ys.T) | |
| def ys_grad(signal, xs, ys): | |
| xs, ys = to_numpy(xs), to_numpy(ys) | |
| return np.dot(xs.T, signal) | |
| return [xs_grad, ys_grad] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment