Skip to content

Instantly share code, notes, and snippets.

@rcdilorenzo
Last active October 29, 2021 13:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rcdilorenzo/b047675728063ff0bdfcb59fc1424c0b to your computer and use it in GitHub Desktop.
Save rcdilorenzo/b047675728063ff0bdfcb59fc1424c0b to your computer and use it in GitHub Desktop.
import numpy as np
from typing import Callable, Optional, List
from pydantic import BaseModel
from tqdm.notebook import tqdm
from math import ceil, isnan, inf
class Activation(BaseModel):
forward: Callable[[np.ndarray], np.ndarray] # input -> output
backward: Callable[[np.ndarray, np.ndarray], np.ndarray] # (dA, X) -> gradients
linear = Activation(
forward=lambda X: X,
backward=lambda dA, X: (
dA # identity
if isinstance(dA, np.ndarray)
else np.dot(dA, np.ones(X.shape)) # broadcast to proper size if scalar
)
)
relu = Activation(
forward=lambda X: (X > 0) * X,
backward=lambda dA, X: dA * (X > 0)
)
class Loss(BaseModel):
forward: Callable[[np.ndarray, np.ndarray], np.ndarray] # (y_true, y_pred) -> cost
backward: Callable[[np.ndarray, np.ndarray], np.ndarray] # (y_true, y_pred) -> gradient
class Layer(BaseModel):
W: np.ndarray
B: np.ndarray
activation: Activation = linear
Z: Optional[np.ndarray] = None
A: Optional[np.ndarray] = None
class Config:
allow_mutation = True
arbitrary_types_allowed = True
class Model(BaseModel):
layers: List[Layer]
loss: Loss
def mse_forward(y_true, y_pred):
return np.sum(np.square(y_pred - y_true)) / len(y_true)
def mse_grad(y_true, y_pred):
# 1/N ∑ (y_pred - y_true)
return np.mean(np.sum(y_pred - y_true))
mse = Loss(forward=mse_forward, backward=mse_grad)
def layer(n_in, n_out, activation="linear"):
W = np.random.rand(n_out, n_in) * np.sqrt(2 / n_in)
B = np.zeros((n_out, 1))
activation_f = linear
if activation == "relu":
activation_f = relu
return Layer(W=W, B=B, activation=activation_f)
def forward(model: Model, X: np.ndarray):
A = X.T
for i, layer in enumerate(model.layers):
layer.A = A
layer.Z = np.dot(layer.W, A) + layer.B
A = layer.activation.forward(layer.Z)
return np.squeeze(A)
def backward(model: Model, y_true: np.ndarray, y_pred: np.ndarray, alpha=0.01, inspect=False):
assert len(y_pred.shape) > 1, "Please reshape the y_pred matrix to be one per row"
assert y_pred.shape == y_true.shape, "y_pred matrix must match the size of the y matrix"
dA = model.loss.backward(y_true, y_pred)
N = y_true.shape[1]
for idx, layer in reversed(list(enumerate(model.layers))):
dZ = layer.activation.backward(dA, layer.Z)
# Gradient is multiplied by previous input (x or known here as A) because of chain rule
# Division by N occurs to get mean since dot product includes a summation
dW = np.dot(dZ, layer.A.T) / N
# Bias has no x term so it is just multipled by 1 based on the chain rule
# (when taking partial derivative with respect to bias)
dB = np.sum(dZ.reshape((1, -1)), axis=1, keepdims=True) / N
# Update weights and biases from gradients
layer.W -= dW * alpha
layer.B -= dB * alpha
# Get gradient for next (earlier) layer
# Because derivative of linear equation (mx + b) is m (a.k.a. slope or weight)
dA = np.dot(layer.W.T, dZ)
if inspect:
print(f"Layer {idx}\n=> W:\n{layer.W}\n=>B:\n{layer.B}\n\n")
return model
def train(model: Model, X_train: np.ndarray, y_train: np.ndarray, epochs=1, early_stopping_n=3, batch_size = 16, learning_rate = 0.01):
N = X_train.shape[0]
losses = []
min_loss = inf
for epoch in range(epochs):
print(f"Epoch {epoch + 1}")
for batch_i in tqdm(range(ceil(N / batch_size))):
i_start, i_end = batch_i * batch_size, (batch_i + 1) * batch_size
X_batch, y_batch = X_train[i_start:i_end], y_train[i_start:i_end]
y_batch_pred = forward(model, X_batch)
loss = model.loss.forward(y_batch, y_batch_pred)
losses.append(loss)
min_loss = min(min_loss, loss)
if isnan(loss):
print("Stopping due to nan loss")
return
if len(losses) > early_stopping_n and (
np.array(losses)[-early_stopping_n:-1]
- np.array(losses)[-early_stopping_n-1:-2] > 0
).all():
print("Early stopping now.")
return
print(f"MSE: {loss:.1f}")
backward(
model,
y_true=y_batch.reshape((1, -1)),
y_pred=y_batch_pred.reshape((1, -1)),
alpha=learning_rate
)
for layer in model.layers:
layer.Z = None
layer.A = None
def real_func(a, b):
return a * b + 3
X = np.random.randint(20, size=(1000, 2))
y = np.array([real_func(x[0], x[1]) for x in X])
N = len(X)
split = 0.8
split_index = int(split * N)
X_train, X_test = X[:split_index], X[split_index:]
y_train, y_test = y[:split_index], y[split_index:]
model = Model(
layers=[
layer(2, 4, activation="relu"),
layer(4, 4, activation="relu"),
layer(4, 4, activation="relu"),
layer(4, 1)
],
loss=mse
)
train(
model,
X_train=X_train,
y_train=y_train,
epochs=100,
learning_rate=0.000000001,
batch_size=16,
early_stopping_n=3
)
forward(model, np.array([[1, 3], [2, 3]]))
# => array([6.8428677, 8.3032102])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment