Deep-Learning-ANN
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# CODE BASED ON http://www.deepideas.net/about-me/ | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from queue import Queue | |
import time | |
# A dictionary that will map operations to gradient functions | |
_gradient_registry = {} | |
class RegisterGradient: | |
"""A decorator for registering the gradient function for an op type. | |
""" | |
def __init__(self, op_type): | |
"""Creates a new decorator with `op_type` as the Operation type. | |
Args: | |
op_type: The name of an operation | |
""" | |
self._op_type = eval(op_type) | |
def __call__(self, f): | |
"""Registers the function `f` as gradient function for `op_type`.""" | |
_gradient_registry[self._op_type] = f | |
return f | |
class Operation(object): | |
"""Represents a graph node that performs a computation. | |
An `Operation` is a node in a `Graph` that takes zero or | |
more objects as input, and produces zero or more objects | |
as output. | |
""" | |
def __init__(self, input_nodes = []): | |
"""Construct Operation | |
""" | |
self.input_nodes = input_nodes | |
# Initialize list of consumers (i.e. nodes that receive this operation's output as input) | |
self.consumers = [] | |
# Append this operation to the list of consumers of all input nodes | |
for input_node in input_nodes: | |
input_node.consumers.append(self) | |
# Append this operation to the list of operations in the currently active default graph | |
_default_graph.operations.append(self) | |
def compute(self): | |
"""Computes the output of this operation. | |
"" Must be implemented by the particular operation. | |
""" | |
pass | |
class add(Operation): | |
"""Returns x + y element-wise. | |
""" | |
def __init__(self, x, y): | |
"""Construct add | |
Args: | |
x: First summand node | |
y: Second summand node | |
""" | |
super().__init__([x, y]) | |
def compute(self, x_value, y_value): | |
"""Compute the output of the add operation | |
Args: | |
x_value: First summand value | |
y_value: Second summand value | |
""" | |
self.inputs = [x_value, y_value] | |
return x_value + y_value | |
class matmul(Operation): | |
"""Multiplies matrix a by matrix b, producing a * b. | |
""" | |
def __init__(self, a, b): | |
"""Construct matmul | |
Args: | |
a: First matrix | |
b: Second matrix | |
""" | |
super().__init__([a, b]) | |
def compute(self, a_value, b_value): | |
"""Compute the output of the matmul operation | |
Args: | |
a_value: First matrix value | |
b_value: Second matrix value | |
""" | |
self.inputs = [a_value, b_value] | |
return a_value.dot(b_value) | |
class sigmoid(Operation): | |
"""Returns the sigmoid of x element-wise. | |
""" | |
def __init__(self, a): | |
"""Construct sigmoid | |
Args: | |
a: Input node | |
""" | |
super().__init__([a]) | |
def compute(self, a_value): | |
"""Compute the output of the sigmoid operation | |
Args: | |
a_value: Input value | |
""" | |
return 1 / (1 + np.exp(-a_value)) | |
class softmax(Operation): | |
"""Returns the softmax of a. | |
""" | |
def __init__(self, a): | |
"""Construct softmax | |
Args: | |
a: Input node | |
""" | |
super().__init__([a]) | |
def compute(self, a_value): | |
"""Compute the output of the softmax operation | |
Args: | |
a_value: Input value | |
""" | |
return np.exp(a_value) / np.sum(np.exp(a_value), axis = 1)[:,None] | |
class log(Operation): | |
"""Computes the natural logarithm of x element-wise. | |
""" | |
def __init__(self, x): | |
"""Construct log | |
Args: | |
x: Input node | |
""" | |
super().__init__([x]) | |
def compute(self, x_value): | |
"""Compute the output of the log operation | |
Args: | |
x_value: Input value | |
""" | |
return np.log(x_value) | |
class multiply(Operation): | |
"""Returns x * y element-wise. | |
""" | |
def __init__(self, x, y): | |
"""Construct multiply | |
Args: | |
x: First multiplicand node | |
y: Second multiplicand node | |
""" | |
super().__init__([x, y]) | |
def compute(self, x_value, y_value): | |
"""Compute the output of the multiply operation | |
Args: | |
x_value: First multiplicand value | |
y_value: Second multiplicand value | |
""" | |
return x_value * y_value | |
class reduce_sum(Operation): | |
"""Computes the sum of elements across dimensions of a tensor. | |
""" | |
def __init__(self, A, axis = None): | |
"""Construct reduce_sum | |
Args: | |
A: The tensor to reduce. | |
axis: The dimensions to reduce. If `None` (the default), reduces all dimensions. | |
""" | |
super().__init__([A]) | |
self.axis = axis | |
def compute(self, A_value): | |
"""Compute the output of the reduce_sum operation | |
Args: | |
A_value: Input tensor value | |
""" | |
return np.sum(A_value, self.axis) | |
class negative(Operation): | |
"""Computes the negative of x element-wise. | |
""" | |
def __init__(self, x): | |
"""Construct negative | |
Args: | |
x: Input node | |
""" | |
super().__init__([x]) | |
def compute(self, x_value): | |
"""Compute the output of the negative operation | |
Args: | |
x_value: Input value | |
""" | |
return -x_value | |
@RegisterGradient("add") | |
def _add_gradient(op, grad): | |
"""Computes the gradients for `add`. | |
Args: | |
op: The `add` `Operation` that we are differentiating | |
grad: Gradient with respect to the output of the `add` op. | |
Returns: | |
Gradients with respect to the input of `add`. | |
""" | |
a = op.inputs[0] | |
b = op.inputs[1] | |
grad_wrt_a = grad | |
while np.ndim(grad_wrt_a) > len(a.shape): | |
grad_wrt_a = np.sum(grad_wrt_a, axis=0) | |
for axis, size in enumerate(a.shape): | |
if size == 1: | |
grad_wrt_a = np.sum(grad_wrt_a, axis=axis, keepdims=True) | |
grad_wrt_b = grad | |
while np.ndim(grad_wrt_b) > len(b.shape): | |
grad_wrt_b = np.sum(grad_wrt_b, axis=0) | |
for axis, size in enumerate(b.shape): | |
if size == 1: | |
grad_wrt_b = np.sum(grad_wrt_b, axis=axis, keepdims=True) | |
return [grad_wrt_a, grad_wrt_b] | |
@RegisterGradient("matmul") | |
def _matmul_gradient(op, grad): | |
"""Computes the gradients for `matmul`. | |
Args: | |
op: The `matmul` `Operation` that we are differentiating | |
grad: Gradient with respect to the output of the `matmul` op. | |
Returns: | |
Gradients with respect to the input of `matmul`. | |
""" | |
A = op.inputs[0] | |
B = op.inputs[1] | |
return [grad.dot(B.T), A.T.dot(grad)] | |
@RegisterGradient("sigmoid") | |
def _sigmoid_gradient(op, grad): | |
"""Computes the gradients for `sigmoid`. | |
Args: | |
op: The `sigmoid` `Operation` that we are differentiating | |
grad: Gradient with respect to the output of the `sigmoid` op. | |
Returns: | |
Gradients with respect to the input of `sigmoid`. | |
""" | |
sigmoid = op.output | |
return grad * sigmoid * (1-sigmoid) | |
@RegisterGradient("softmax") | |
def _softmax_gradient(op, grad): | |
"""Computes the gradients for `softmax`. | |
Args: | |
op: The `softmax` `Operation` that we are differentiating | |
grad: Gradient with respect to the output of the `softmax` op. | |
Returns: | |
Gradients with respect to the input of `softmax`. | |
""" | |
softmax = op.output | |
return (grad - np.reshape( | |
np.sum(grad * softmax, 1), | |
[-1, 1] | |
)) * softmax | |
@RegisterGradient("log") | |
def _log_gradient(op, grad): | |
"""Computes the gradients for `log`. | |
Args: | |
op: The `log` `Operation` that we are differentiating | |
grad: Gradient with respect to the output of the `log` op. | |
Returns: | |
Gradients with respect to the input of `log`. | |
""" | |
x = op.inputs[0] | |
return grad/x | |
@RegisterGradient("multiply") | |
def _multiply_gradient(op, grad): | |
"""Computes the gradients for `multiply`. | |
Args: | |
op: The `multiply` `Operation` that we are differentiating | |
grad: Gradient with respect to the output of the `multiply` op. | |
Returns: | |
Gradients with respect to the input of `multiply`. | |
""" | |
A = op.inputs[0] | |
B = op.inputs[1] | |
return [grad * B, grad * A] | |
@RegisterGradient("reduce_sum") | |
def _reduce_sum_gradient(op, grad): | |
"""Computes the gradients for `reduce_sum`. | |
Args: | |
op: The `reduce_sum` `Operation` that we are differentiating | |
grad: Gradient with respect to the output of the `reduce_sum` op. | |
Returns: | |
Gradients with respect to the input of `reduce_sum`. | |
""" | |
A = op.inputs[0] | |
output_shape = np.array(A.shape) | |
output_shape[op.axis] = 1 | |
tile_scaling = A.shape // output_shape | |
grad = np.reshape(grad, output_shape) | |
return np.tile(grad, tile_scaling) | |
@RegisterGradient("negative") | |
def _negative_gradient(op, grad): | |
"""Computes the gradients for `negative`. | |
Args: | |
op: The `negative` `Operation` that we are differentiating | |
grad: Gradient with respect to the output of the `negative` op. | |
Returns: | |
Gradients with respect to the input of `negative`. | |
""" | |
return -grad | |
class placeholder: | |
"""Represents a placeholder node that has to be provided with a value | |
when computing the output of a computational graph | |
""" | |
def __init__(self): | |
"""Construct placeholder | |
""" | |
self.consumers = [] | |
# Append this placeholder to the list of placeholders in the currently active default graph | |
_default_graph.placeholders.append(self) | |
class Variable: | |
"""Represents a variable (i.e. an intrinsic, changeable parameter of a computational graph). | |
""" | |
def __init__(self, initial_value = None): | |
"""Construct Variable | |
Args: | |
initial_value: The initial value of this variable | |
""" | |
self.value = initial_value | |
self.consumers = [] | |
# Append this variable to the list of variables in the currently active default graph | |
_default_graph.variables.append(self) | |
class Graph: | |
"""Represents a computational graph | |
""" | |
def __init__(self): | |
"""Construct Graph""" | |
self.operations = [] | |
self.placeholders = [] | |
self.variables = [] | |
def as_default(self): | |
global _default_graph | |
_default_graph = self | |
class Session: | |
"""Represents a particular execution of a computational graph. | |
""" | |
def run(self, operation, feed_dict = {}): | |
"""Computes the output of an operation | |
Args: | |
operation: The operation whose output we'd like to compute. | |
feed_dict: A dictionary that maps placeholders to values for this session | |
""" | |
# Perform a post-order traversal of the graph to bring the nodes into the right order | |
nodes_postorder = traverse_postorder(operation) | |
# Iterate all nodes to determine their value | |
for node in nodes_postorder: | |
if type(node) == placeholder: | |
# Set the node value to the placeholder value from feed_dict | |
node.output = feed_dict[node] | |
elif type(node) == Variable: | |
# Set the node value to the variable's value attribute | |
node.output = node.value | |
else: # Operation | |
# Get the input values for this operation from node_values | |
node.inputs = [input_node.output for input_node in node.input_nodes] | |
# Compute the output of this operation | |
node.output = node.compute(*node.inputs) | |
# Convert lists to numpy arrays | |
if type(node.output) == list: | |
node.output = np.array(node.output) | |
# Return the requested node value | |
return operation.output | |
def traverse_postorder(operation): | |
"""Performs a post-order traversal, returning a list of nodes | |
in the order in which they have to be computed | |
Args: | |
operation: The operation to start traversal at | |
""" | |
nodes_postorder = [] | |
def recurse(node): | |
if isinstance(node, Operation): | |
for input_node in node.input_nodes: | |
recurse(input_node) | |
nodes_postorder.append(node) | |
recurse(operation) | |
return nodes_postorder | |
class GradientDescentOptimizer: | |
def __init__(self, learning_rate): | |
self.learning_rate = learning_rate | |
def minimize(self, loss): | |
learning_rate = self.learning_rate | |
class MinimizationOperation(Operation): | |
def compute(self): | |
# Compute gradients | |
grad_table = compute_gradients(loss) | |
# Iterate all variables | |
for node in grad_table: | |
if type(node) == Variable: | |
# Retrieve gradient for this variable | |
grad = grad_table[node] | |
# Take a step along the direction of the negative gradient | |
node.value -= learning_rate * grad | |
return MinimizationOperation() | |
def compute_gradients(loss): | |
# grad_table[node] will contain the gradient of the loss w.r.t. the node's output | |
grad_table = {} | |
# The gradient of the loss with respect to the loss is just 1 | |
grad_table[loss] = 1 | |
# Perform a breadth-first search, backwards from the loss | |
visited = set() | |
queue = Queue() | |
visited.add(loss) | |
queue.put(loss) | |
while not queue.empty(): | |
node = queue.get() | |
#print("CurrNode: " + str(node)) | |
# If this node is not the loss | |
if node != loss: | |
# | |
# Compute the gradient of the loss with respect to this node's output | |
# | |
grad_table[node] = 0 | |
# Iterate all consumers | |
for consumer in node.consumers: | |
#print("\t Consumer: " + str(consumer)) | |
#print("\t GradTable: " + str(grad_table)) | |
# Retrieve the gradient of the loss w.r.t. consumer's output | |
lossgrad_wrt_consumer_output = grad_table[consumer] | |
# Retrieve the function which computes gradients with respect to | |
# consumer's inputs given gradients with respect to consumer's output. | |
consumer_op_type = consumer.__class__ | |
bprop = _gradient_registry[consumer_op_type] | |
# Get the gradient of the loss with respect to all of consumer's inputs | |
lossgrads_wrt_consumer_inputs = bprop(consumer, lossgrad_wrt_consumer_output) | |
if len(consumer.input_nodes) == 1: | |
# If there is a single input node to the consumer, lossgrads_wrt_consumer_inputs is a scalar | |
grad_table[node] += lossgrads_wrt_consumer_inputs | |
else: | |
# Otherwise, lossgrads_wrt_consumer_inputs is an array of gradients for each input node | |
# Retrieve the index of node in consumer's inputs | |
node_index_in_consumer_inputs = consumer.input_nodes.index(node) | |
# Get the gradient of the loss with respect to node | |
lossgrad_wrt_node = lossgrads_wrt_consumer_inputs[node_index_in_consumer_inputs] | |
# Add to total gradient | |
grad_table[node] += lossgrad_wrt_node | |
# | |
# Append each input node to the queue | |
# | |
if hasattr(node, "input_nodes"): | |
for input_node in node.input_nodes: | |
if not input_node in visited: | |
visited.add(input_node) | |
queue.put(input_node) | |
# Return gradients for each visited node | |
return grad_table | |
# # Create a new graph | |
# Graph().as_default() | |
# # Create variables | |
# A = Variable([[1, 0], [0, -1]]) | |
# b = Variable([1, 1]) | |
# # Create placeholder | |
# x = placeholder() | |
# # Create hidden node y | |
# y = matmul(A, x) | |
# # Create output node z | |
# z = add(y, b) | |
# session = Session() | |
# output = session.run(z, { | |
# x: [1, 2] | |
# }) | |
# print(output) | |
# Clicker | |
fig = plt.figure() | |
ax = fig.add_subplot(111) | |
axes = plt.gca() | |
axes.set_xlim([-2,2]) | |
axes.set_ylim([-2,2]) | |
reds = [] | |
blues = [] | |
click_done = False | |
plt.title('Right click for blue points. Left click for red points. Middle click to optimize') | |
plt.ion() | |
def onclick(event): | |
global click_done | |
if event.button == 2: | |
fig.canvas.mpl_disconnect(cid) | |
click_done = True | |
return | |
ix, iy = event.xdata, event.ydata | |
coords = [] | |
coords.append((ix, iy)) | |
npv = np.asarray(coords) | |
if(event.button == 1): | |
reds.append(coords) | |
plt.scatter(npv[:,0], npv[:,1], color='red',edgecolor='black', linewidth='1',zorder=1) | |
else: | |
blues.append(coords) | |
plt.scatter(npv[:,0], npv[:,1], color='blue',edgecolor='black', linewidth='1',zorder=1) | |
plt.draw() | |
return coords | |
cid = fig.canvas.mpl_connect('button_press_event', onclick) | |
while(not click_done): | |
time.sleep(0.05) | |
plt.pause(0.01) | |
red_points = np.asarray(reds).reshape(len(reds),2) | |
blue_points = np.asarray(blues).reshape(len(blues),2) | |
# Create a new graph | |
Graph().as_default() | |
# Create training input placeholder | |
X = placeholder() | |
# Create placeholder for the training classes | |
c = placeholder() | |
DIM = 3 | |
# Build a hidden layer | |
W_hidden = Variable(np.random.randn(2, DIM)) | |
b_hidden = Variable(np.random.randn(DIM)) | |
p_hidden = sigmoid( add(matmul(X, W_hidden), b_hidden) ) | |
# Build the output layer | |
W_output = Variable(np.random.randn(DIM, 2)) | |
b_output = Variable(np.random.randn(2)) | |
p_output = softmax( add(matmul(p_hidden, W_output), b_output) ) | |
# Build cross-entropy loss | |
J = negative(reduce_sum(reduce_sum(multiply(c, log(p_output)), axis=1))) | |
# Build minimization op | |
minimization_op = GradientDescentOptimizer(learning_rate = 0.05).minimize(J) | |
# Build placeholder inputs | |
feed_dict = { | |
X: np.concatenate((blue_points, red_points)), | |
c: | |
[[1, 0]] * len(blue_points) | |
+ [[0, 1]] * len(red_points) | |
} | |
# Create session | |
session = Session() | |
# Perform 100 gradient descent steps | |
step_val = 400 | |
for step in range(30000): | |
J_value = session.run(J, feed_dict) | |
if step % step_val == 0: | |
print("Step:", step, " Loss:", J_value) | |
session.run(minimization_op, feed_dict) | |
if step % step_val == 0: | |
# Print final result | |
W_hidden_value = session.run(W_hidden) | |
#print("Hidden layer weight matrix:\n", W_hidden_value) | |
b_hidden_value = session.run(b_hidden) | |
#print("Hidden layer bias:\n", b_hidden_value) | |
W_output_value = session.run(W_output) | |
#print("Output layer weight matrix:\n", W_output_value) | |
b_output_value = session.run(b_output) | |
#print("Output layer bias:\n", b_output_value) | |
# Visualize classification boundary | |
xs = np.linspace(-2, 2) | |
ys = np.linspace(-2, 2) | |
pred_classes = [] | |
for x in xs: | |
for y in ys: | |
pred_class = session.run(p_output, | |
feed_dict={X: [[x, y]]})[0] | |
pred_classes.append((x, y, pred_class.argmax())) | |
xs_p, ys_p = [], [] | |
xs_n, ys_n = [], [] | |
for x, y, c in pred_classes: | |
if c == 0: | |
xs_n.append(x) | |
ys_n.append(y) | |
else: | |
xs_p.append(x) | |
ys_p.append(y) | |
plt.clf() | |
plt.plot(xs_p, ys_p, 'ro', xs_n, ys_n, 'bo',zorder=0) | |
plt.scatter(red_points[:,0], red_points[:,1], color='red',edgecolor='black', linewidth='1',zorder=1) | |
plt.scatter(blue_points[:,0], blue_points[:,1], color='blue',edgecolor='black', linewidth='1',zorder=1) | |
plt.pause(0.01) | |
plt.show() | |
while(1): | |
time.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment