Skip to content

Instantly share code, notes, and snippets.

@Minecraftian14
Last active July 2, 2022 17:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Minecraftian14/58c39a800af8d5c330005d21600fdf82 to your computer and use it in GitHub Desktop.
Save Minecraftian14/58c39a800af8d5c330005d21600fdf82 to your computer and use it in GitHub Desktop.
Implementation of some common DL algorithms in Python.
from __future__ import annotations
import traceback
from numbers import Number
from typing import Union, Callable, Tuple
import beepy
import matplotlib.pyplot as plt
import numpy as np
from numpy import ndarray
DimsPrim = int;
Dims1D = Tuple[int];
Dims2D = Tuple[int, int];
Dims3D = Tuple[int, int, int];
Dims4D = Tuple[int, int, int, int];
Dims5D = Tuple[int, int, int, int, int];
DimsND = Union[Dims1D, Dims2D, Dims3D, Dims4D, Dims5D];
Dims = Union[DimsPrim, DimsND, 'Layer'];
def get_windows_for_pooling(A: ndarray, window_shape: Dims2D) -> ndarray:
assert len(A.shape) == 4, A.shape;
m, h, w, c = A.shape;
fh, fw = window_shape;
assert h % fh == 0 and w % fw == 0;
oh, ow = h // fh, w // fw;
assert oh > 0 and ow > 0;
mI, mJ, mK, mL = A.strides;
strides = (mI, fh * mJ, fw * mK, mJ, mK, mL);
windows = np.lib.stride_tricks.as_strided(A, (m, oh, ow, fh, fw, c), strides);
return windows;
def get_windows_for_mini_batch(A: ndarray, number_of_batches=-1, size_of_batches=-1) -> ndarray:
assert number_of_batches * size_of_batches != 1, "PLease provide value for either one!";
m = A.shape[0];
shape = A.shape[1:];
strides = A.strides;
if number_of_batches == -1:
# assert m % size_of_batches == 0;
number_of_batches = m // size_of_batches;
elif size_of_batches == -1:
# assert m % number_of_batches == 0:
size_of_batches = m // number_of_batches;
# strides = (A.dtype.alignment * np.product(shape),) + strides;
# shape = (size_of_batches,) + shape;
# strides = (A.dtype.alignment * np.product(shape),) + strides;
# shape = (number_of_batches,) + shape;
return np.lib.stride_tricks.as_strided(A, (number_of_batches, size_of_batches) + shape,
(A.dtype.alignment * size_of_batches * np.product(shape),) + strides);
class Layer:
@staticmethod
def simplify_dims(value: Dims) -> DimsND:
if isinstance(value, Number):
value = (value,)
elif isinstance(value, Layer):
value = value.s_out;
return value;
@staticmethod
def set_parameters_helper(parameters, *args):
return args;
def __init__(this, s_in: Dims, s_out: Dims, lr: float):
this.s_in = Layer.simplify_dims(s_in);
this.s_out = Layer.simplify_dims(s_out);
this.lr = lr;
def predict(this, a_in: ndarray) -> ndarray:
raise NotImplementedError();
def forward(this, a_in: ndarray) -> ndarray:
return this.predict(a_in);
def un_predict(this, da_out: ndarray) -> ndarray:
raise NotImplementedError();
def backward(this, da_out: ndarray) -> ndarray:
return this.un_predict(da_out);
def get_parameters(this) -> list[ndarray]:
return [];
def set_parameters(this, parameters: list[ndarray]):
pass;
def set_lr(this, lr):
this.lr = lr;
def are_input_dims_compliant(this, x: ndarray) -> bool:
return x.shape[1:] == this.s_in;
def are_output_dims_compliant(this, y: ndarray) -> bool:
return y.shape[1:] == this.s_out;
# Just a dummy layer, so that I won't have to change layer[-1] with n everytime.
class InputLayer(Layer):
def __init__(this, s_in: Dims):
super().__init__(s_in, s_in, 0)
def predict(this, a_in: ndarray) -> ndarray:
return a_in;
def un_predict(this, da_out: ndarray) -> ndarray:
return da_out;
class AdditiveParametersLayer(Layer):
def __init__(this, s_in: Dims, lr: float):
super().__init__(s_in, s_in, lr);
this.b = np.random.random((1,) + this.s_out) - 0.5;
def predict(this, a_in: ndarray) -> ndarray:
assert a_in.shape[1:] == this.s_in;
return a_in + this.b;
def un_predict(this, da_out: ndarray) -> ndarray:
assert da_out.shape[1:] == this.s_out;
return da_out;
def backward(this, da_out: ndarray) -> ndarray:
assert da_out.shape[1:] == this.s_out;
db = np.mean(da_out, axis=0, keepdims=True);
this.b -= this.lr * db;
return this.un_predict(da_out);
def get_parameters(this) -> list[ndarray]:
return [this.b];
def set_parameters(this, parameters: list[ndarray]):
assert this.b.shape == parameters[0].shape;
this.b = parameters[0];
class MultiplicativeParametersLayer(Layer):
LinearDims = Union[DimsPrim, Dims1D];
def __init__(this, s_in: LinearDims, s_out: LinearDims, lr: float):
super().__init__(s_in, s_out, lr);
s_in = this.s_in[0];
s_out = this.s_out[0];
this.w = np.random.standard_normal((s_in, s_out)) * np.sqrt(2 / s_in);
def predict(this, a_in: ndarray) -> ndarray:
assert a_in.shape[1:] == this.s_in, a_in.shape;
return a_in @ this.w;
def forward(this, a_in: ndarray) -> ndarray:
this.a_in = a_in;
return this.predict(a_in)
def un_predict(this, da_out: ndarray) -> ndarray:
assert da_out.shape[1:] == this.s_out;
da_in = da_out @ this.w.T;
return da_in;
def backward(this, da_out: ndarray) -> ndarray:
assert da_out.shape[1:] == this.s_out;
dw = (this.a_in.T @ da_out) / this.a_in.shape[0];
this.w = this.w - this.lr * dw;
return this.un_predict(da_out);
def get_parameters(this) -> list[ndarray]:
return [this.w];
def set_parameters(this, parameters: list[ndarray]):
assert this.w.shape == parameters[0].shape;
this.w = parameters[0];
class SuperMultiplicativeParametersLayer(Layer):
def __init__(this, s_in: Dims, lr: float, weights_height: int):
super().__init__(s_in, weights_height, lr)
contract = "".join([chr(x + 97) for x in range(len(this.s_in))]);
assert 'y' not in contract;
# y stands for m or #samples, z stands for c or classes.
this.forward_operation = "y%s,%sz->yz" % (contract, contract);
this.un_predict_operation = "yz,z%s->y%s" % (contract[::-1], contract);
this.backward_operation = "%sy,yz->%sz" % (contract[::-1], contract);
this.w = np.random.standard_normal(this.s_in + this.s_out) * np.sqrt(2 / np.product(this.s_in));
def predict(this, a_in: ndarray) -> ndarray:
assert a_in.shape[1:] == this.s_in;
return np.einsum(this.forward_operation, a_in, this.w);
def forward(this, a_in: ndarray) -> ndarray:
this.a_in = a_in;
return this.predict(a_in)
def un_predict(this, da_out: ndarray) -> ndarray:
assert da_out.shape[1:] == this.s_out;
da_in = np.einsum(this.un_predict_operation, da_out, this.w.T);
return da_in;
def backward(this, da_out: ndarray) -> ndarray:
assert da_out.shape[1:] == this.s_out;
dw = np.einsum(this.backward_operation, this.a_in.T, da_out) / this.a_in.shape[0];
this.w = this.w - this.lr * dw;
return this.un_predict(da_out);
def get_parameters(this) -> list[ndarray]:
return [this.w];
def set_parameters(this, parameters: list[ndarray]):
assert this.w.shape == parameters[0].shape;
this.w = parameters[0];
class ConvolutiveParametersLayer(Layer):
# If forward is a(l) = w(l) (*) a(l-1)
# then backward goes like
# del(l-1) = Σ<k=1,n> del(l,k) (*)f Rot<180>(w(l)) * g'(z(l-1))
# df(l,k) = a(l-1) (*) d(l,k)
# df(l,k) = a(l-1) (*) d(l,k)
@staticmethod
def get_windows_for_a_fully_featured_convolution(
a: ndarray, shape: Dims2D, strides: Dims2D = (1, 1)) -> ndarray:
"""
:param a: A 4D array of shape [m, h, w, c] where m is #samples, h is height of image, w is width of image and c is #channels in image.
:param shape: A 2D array of shape [fH, fW] where fH is filter height and fW is filter width.
:param strides: A 2D array of shape [sH, sW] where sH is the strides along the height and sW is the strides along the width.
:return: A 6D array of shape [m, fR, fC, fH, fW, c] where fR is number of windows row wise and fC is number of windows column wise.
"""
assert len(a.shape) == 4;
m, h, w, c = a.shape;
fH, fW = shape;
sH, sW = strides;
fR, fC = h // sH - fH + 1, w // sW - fW + 1;
assert fR > 0 and fW > 0, "Can't make windows of size [%d, %d] for an image of size [%d, %d]." % (fH, fW, h, w);
shape = (m, fR, fC, fH, fW, c);
mI, mJ, mK, mL = a.strides;
strides = (mI, mJ * sH, mK * sW, mJ, mK, mL);
return np.lib.stride_tricks.as_strided(a, shape, strides);
@staticmethod
def get_output(i: Dims2D, f: Dims2D, p: Dims2D = (0, 0), s: Dims2D = (1, 1)) -> Dims2D:
h, w = i;
fh, fw = f;
ph, pw = p;
sh, sw = s;
return (h + 2 * ph) // sh - fh + 1, (w + 2 * pw) // sw - fw + 1;
@staticmethod
def get_pads(i: Dims2D, f: Dims2D, s: Dims2D = (1, 1), o: Dims2D = None) -> Dims2D:
if o is None: o = i;
h, w = i;
fh, fw = f;
sh, sw = s;
oh, ow = o;
return ((oh + fh - 1) * sh - h) // 2, ((ow + fw - 1) * sw - w) // 2;
def __init__(this, s_in: Dims, lr: float,
filter_shape: Dims2D, number_of_filters: int = 1,
padding: Dims2D = (0, 0), striding: Dims2D = (1, 1)):
s_in = Layer.simplify_dims(s_in);
assert len(s_in) == 3;
h, w, c = s_in;
fh, fw = filter_shape;
fn = number_of_filters;
ph, pw = padding;
sh, sw = striding;
oh, ow = ConvolutiveParametersLayer.get_output((h, w), filter_shape, padding, striding);
s_out = (oh, ow, fn);
super().__init__(s_in, s_out, lr);
this.filter_shape = (fh, fw, c, fn);
this.filter_weight = np.product(s_out);
this.h, this.w, this.c, this.fh, this.fw, this.fn, this.ph, this.pw, this.sh, this.sw, this.oh, this.ow = \
h, w, c, fh, fw, fn, ph, pw, sh, sw, oh, ow;
this.dph, this.dpw = ConvolutiveParametersLayer.get_pads((oh, ow), filter_shape, striding);
this.do_we_pad = ph != 0 and pw != 0;
# this.f = np.random.standard_normal(this.filter_shape) * np.sqrt(2 / h / w / c);
this.f = np.random.standard_normal(this.filter_shape) * np.sqrt(2 / h / w / c);
def convolve(this, a: ndarray, b: ndarray):
assert a.ndim == b.ndim == 4;
m = a.shape[0];
h, w, fh, fw, ph, pw, sh, sw = this.h, this.w, this.fh, this.fw, this.ph, this.pw, this.sh, this.sw;
a = np.pad(a, ((0, 0), (ph, ph), (pw, pw), (0, 0)));
windows = ConvolutiveParametersLayer.get_windows_for_a_fully_featured_convolution(a, (fh, fw), (sh, sw));
if b.shape[0] == m: # b is da_out
return np.einsum('mhwfgc,mhwn->mfgcn', windows, b, optimize=True);
# return np.sum(windows[:, :, :, :, :, :, None] * b[:, :, :, None, None, None, :], axis=(1, 2));
else:
return np.einsum('mhwfgc,fgcn->mhwn', windows, b, optimize=True);
# return np.sum(windows[:, :, :, :, :, :, None] * b[None, None, None, :, :, :, :], axis=(3, 4, 5));
# No not really, it's npt exactly what you would call an inverse, mathematically.
def convolve_inverse(this, o: ndarray, b: ndarray):
assert o.ndim == b.ndim == 4;
m = o.shape[0];
h, w, c, fh, fw, ph, pw, sh, sw, oh, ow = this.h, this.w, this.c, this.fh, this.fw, this.ph, this.pw, this.sh, this.sw, this.oh, this.ow;
a = np.ones((m, ph + h + ph, pw + w + pw, c));
windows = ConvolutiveParametersLayer.get_windows_for_a_fully_featured_convolution(a, (oh, ow), (sh, sw));
windows += np.einsum('mhwfgc,mfgn,hwcn->mhwfgc', windows, o, b, optimize=True) - 1;
# windows += (windows[:, :, :, :, :, :, None] *
# o[:, None, None, :, :, None, :] *
# b[None, :, :, None, None, :, :]) - 1;
return a;
def predict(this, a_in: ndarray) -> ndarray:
assert a_in.shape[1:] == this.s_in, a_in.shape;
return this.convolve(a_in, this.f);
def forward(this, a_in: ndarray) -> ndarray:
this.a_in = a_in;
return this.convolve(a_in, this.f);
def un_predict(this, da_out: ndarray) -> ndarray:
assert da_out.shape[1:] == this.s_out;
return this.convolve_inverse(da_out, this.f);
def backward(this, da_out: ndarray) -> ndarray:
df = np.mean(this.convolve(this.a_in, da_out), axis=0) / this.filter_weight;
this.f -= this.lr * df;
return this.un_predict(da_out);
def get_parameters(this) -> list[ndarray]:
return [this.f];
def set_parameters(this, parameters: list[ndarray]):
assert this.f.shape == parameters[0].shape;
this.f = parameters[0];
class DeprecatedMaxPoolingLayer(Layer):
def __init__(this, s_in: Dims, filter_size: Dims2D, lr: float = 0):
s_in = Layer.simplify_dims(s_in);
assert np.sum(np.mod(s_in, (filter_size + (1,)))) == 0;
super().__init__(s_in, tuple(np.divide(s_in, (filter_size + (1,))).astype(int)), lr)
this.filter_size = filter_size;
def predict(this, a_in: ndarray) -> ndarray:
assert a_in.shape[1:] == this.s_in;
windows = get_windows_for_pooling(a_in, this.filter_size);
return np.max(windows, axis=(3, 4));
def forward(this, a_in: ndarray) -> ndarray:
assert a_in.shape[1:] == this.s_in;
windows = get_windows_for_pooling(a_in, this.filter_size);
a_out = np.max(windows, axis=(3, 4), keepdims=True);
this.indices = windows == a_out;
# this.indices = np.where(windows == a_out, 1, 0);
return a_out.squeeze((3, 4));
def un_predict(this, da_out: ndarray) -> ndarray:
da_in = np.zeros((da_out.shape[0],) + this.s_in);
da_in[:, 0:this.s_in[0]:this.filter_size[0], 0:this.s_in[1]:this.filter_size[1]] = da_out;
da_in[:, 1:this.s_in[0]:this.filter_size[0], 0:this.s_in[1]:this.filter_size[1]] = da_out;
da_in[:, 0:this.s_in[0]:this.filter_size[0], 1:this.s_in[1]:this.filter_size[1]] = da_out;
da_in[:, 1:this.s_in[0]:this.filter_size[0], 1:this.s_in[1]:this.filter_size[1]] = da_out;
return da_in;
def backward(this, da_out: ndarray) -> ndarray:
da_in = np.zeros((da_out.shape[0],) + this.s_in);
m, oh, ow, fh, fw, c = np.where(this.indices == 1);
# m, oh, ow, fh, fw, c = this.indices;
rb, cb = oh * this.filter_size[0] + fh, ow * this.filter_size[1] + fw;
da_in[m, rb, cb, c] = da_out[m, oh, ow, c].flatten();
return da_in;
# ... Please make separate layers for padding and striding ...?
class ValueUpdaters:
def __init__(this, alpha: float, beta: float):
this.alpha = alpha;
this.beta = beta;
def update(this, v_old: ndarray, dv_incoming: ndarray) -> ndarray:
raise NotImplementedError();
class ValueWithBiasFixing(ValueUpdaters):
def __init__(this, beta: float):
super().__init__(0, beta)
this.time = 1;
def update(this, v_old, dv_incoming):
v_new = v_old / (1 - np.power(this.beta, this.time));
this.time += 1;
return v_new;
class ValueWithMomentum(ValueUpdaters):
def __init__(this, alpha: float, beta: float = 0.9):
super().__init__(alpha, beta)
this.vdv = 0;
def calculatVDV(this, v_old, dv_incoming):
return this.beta * this.vdv + (1 - this.beta) * dv_incoming;
def update(this, v_old, dv_incoming):
this.vdv = this.calculatVDV(v_old, dv_incoming);
v_new = v_old - this.alpha * this.vdv;
return v_new;
class BiasFixedValueWithMomentum(ValueWithMomentum):
def __init__(this, alpha: float, beta: float = 0.9):
super().__init__(alpha, beta);
this.fixer = ValueWithBiasFixing(beta);
def calculatVDV(this, v_old, dv_incoming):
return this.fixer.update(super().calculatVDV(v_old, dv_incoming), None);
class ValueWithRMSProp(ValueUpdaters):
def __init__(this, alpha: float, beta: float = 0.999, epsilon=1e-8):
super().__init__(alpha, beta)
this.epsilon = epsilon;
this.sdv = 0;
def calculatSDV(this, v_old, dv_incoming):
return this.beta * this.sdv + (1 - this.beta) * (dv_incoming * dv_incoming);
def update(this, v_old, dv_incoming):
this.sdv = this.calculatSDV(v_old, dv_incoming);
v_new = v_old - this.alpha * dv_incoming / np.sqrt(this.sdv + this.epsilon)
return v_new;
class BiasFixedValueWithRMSProp(ValueWithRMSProp):
def __init__(this, alpha: float, beta: float = 0.999, epsilon=1e-8):
super().__init__(alpha, beta, epsilon);
this.fixer = ValueWithBiasFixing(beta);
def calculatSDV(this, v_old, dv_incoming):
return this.fixer.update(super().calculatSDV(v_old, dv_incoming), None);
class ValueWithAdam(ValueUpdaters):
def __init__(this, alpha: float, beta1: float = 0.9, beta2: float = 0.999, epsilon: float = 1e-8):
super().__init__(alpha, beta1);
this.epsilon = epsilon;
this.momentum = BiasFixedValueWithMomentum(alpha, beta1);
this.rmsprop = BiasFixedValueWithRMSProp(alpha, beta2, epsilon);
def update(this, v_old: ndarray, dv_incoming: ndarray) -> ndarray:
vdv = this.momentum.calculatVDV(v_old, dv_incoming);
sdv = this.rmsprop.calculatSDV(v_old, dv_incoming);
return v_old + this.alpha * vdv / np.sqrt(np.abs(sdv) + this.epsilon);
def set_alpha(this, new_alpha: float):
this.alpha = new_alpha;
this.momentum.alpha = new_alpha;
this.rmsprop.alpha = new_alpha;
class LayerModulator(Layer):
def __init__(this, layer: Layer):
super().__init__(layer.s_in, layer.s_out, layer.lr);
this.layer = layer;
def predict(this, a_in: ndarray) -> ndarray:
return this.process(this.layer.predict(a_in));
def forward(this, a_in: ndarray) -> ndarray:
return this.process(this.layer.forward(a_in));
def un_predict(this, da_out: ndarray) -> ndarray:
return this.deprocess(this.layer.un_predict(da_out));
def backward(this, da_out: ndarray) -> ndarray:
return this.deprocess(this.layer.backward(da_out));
def get_parameters(this) -> list[ndarray]:
return this.layer.get_parameters();
def set_parameters(this, parameters: list[ndarray]):
this.layer.set_parameters(parameters);
def set_lr(this, lr):
this.layer.set_lr(lr);
def process(this, result: ndarray) -> ndarray:
raise NotImplementedError();
def deprocess(this, result: ndarray) -> ndarray:
raise NotImplementedError();
class NormalizeLayer(LayerModulator):
def process(this, result: ndarray) -> ndarray:
this.norm = np.linalg.norm(result);
return result / this.norm;
def deprocess(this, result: ndarray) -> ndarray:
return result * this.norm;
class ClipLayer(LayerModulator):
def process(this, result: ndarray) -> ndarray:
return np.clip(result, -1.57, 1.57);
def deprocess(this, result: ndarray) -> ndarray:
return np.clip(result, -1.57, 1.57);
class LayerChain(Layer):
def __init__(this, layers: list[Layer], lr: float):
super().__init__(layers[0].s_in, layers[-1].s_out, lr);
this.layers = layers;
def get_parameters(this) -> list[ndarray]:
parameters = [];
for layer in this.layers:
parameters += layer.get_parameters();
return parameters;
def set_parameters(this, parameters_list: list[ndarray]):
for layer in reversed(this.layers):
parameters = [];
for i in range(len(layer.get_parameters())):
parameters.insert(0, parameters_list.pop());
layer.set_parameters(parameters);
assert len(parameters_list) == 0;
def set_lr(this, lr):
super().set_lr(lr)
for layer in this.layers:
layer.set_lr(lr);
def predict(this, a_in: ndarray) -> ndarray:
for layer in this.layers:
a_in = layer.predict(a_in);
return a_in;
def forward(this, a_in: ndarray) -> ndarray:
for layer in this.layers:
a_in = layer.forward(a_in);
return a_in;
def un_predict(this, da_out: ndarray) -> ndarray:
for layer in reversed(this.layers):
da_out = layer.un_predict(da_out);
return da_out;
def backward(this, da_out: ndarray) -> ndarray:
for layer in reversed(this.layers):
da_out = layer.backward(da_out);
return da_out;
class MultiLayeredModel(LayerChain):
def __init__(this, sequence: Union[list[int]], lr: float, layer_supplier: Callable[[int, int, float], Layer]):
layers = [];
for i in range(1, len(sequence)):
layers.append(layer_supplier(sequence[i - 1], sequence[i], lr));
layers.append(LeakyReluLayer(sequence[i - 1], sequence[i], 0.01));
layers.pop() # remove last relu
layers.append(SigmoidLayer(sequence[-1], sequence[-1], lr));
super().__init__(layers, lr);
class Perceptron(LayerChain):
def __init__(this, s_in: Dims, s_out: int, lr: float):
this.weights = MultiplicativeParametersLayer(s_in, s_out, lr);
this.bias = AdditiveParametersLayer(s_out, lr);
super().__init__([this.weights, this.bias], lr);
this.w = this.weights.w;
this.b = this.bias.b;
def forward(this, a_in: ndarray) -> ndarray:
a_out = super().forward(a_in);
this.a_in = this.weights.a_in;
return a_out;
class ADAMLayer(Perceptron):
def __init__(this, s_in: int, s_out: int, lr: float):
super().__init__(s_in, s_out, lr)
this.dw_fixer = ValueWithAdam(lr);
this.db_fixer = ValueWithAdam(lr);
def backward(this, da_out: ndarray) -> ndarray:
assert da_out.shape[1:] == this.s_out;
dw = (this.a_in.T @ da_out) / this.s_in;
this.w = this.dw_fixer.update(this.w, dw);
da_in = (da_out @ this.w.T) * this.a_in;
db = np.sum(da_out, axis=1, keepdims=True) / this.s_in;
this.b = this.db_fixer.update(this.b, db);
return da_in;
def set_lr(this, lr):
super().set_lr(lr);
this.dw_fixer.set_alpha(lr);
class MonoicBiModularComparatorModel(LayerChain):
def __init__(this, input: Layer, output_0: Layer, output_1: Layer, lr: float):
assert output_0.s_in == output_1.s_in == input.s_out and output_0.s_out == output_1.s_out;
super().__init__([input, output_0, output_1], lr);
this.input = input;
this.output_0 = output_0;
this.output_1 = output_1;
this.stop = 0.5;
this.desc = np.random.random((this.s_in, 1));
this.connection = MultiplicativeParametersLayer(this.input.s_out, 1, lr);
this.connection.w = np.ones((this.connection.s_in, this.connection.s_out)) / this.connection.s_in;
this.activation = SigmoidLayer(1, 1, lr);
def predict(this, a_in: ndarray) -> ndarray:
a_in = this.input.forward(a_in);
desc = this.connection.forward(a_in);
desc = this.activation.forward(desc).reshape((-1,));
stop = np.mean(desc);
a_out = np.zeros((a_in.shape[0], this.output_0.s_out));
a_out[desc < stop] = this.output_0.forward(a_in[desc < stop]);
a_out[desc >= stop] = this.output_1.forward(a_in[desc >= stop]);
return a_out;
def forward(this, a_in: ndarray) -> ndarray:
a_in = this.input.forward(a_in);
this.desc = this.connection.forward(a_in);
this.desc = this.activation.forward(this.desc).reshape((-1,));
this.stop = np.mean(this.desc);
a_out = np.zeros((a_in.shape[0], this.output_0.s_out));
a_out[this.desc < this.stop] = this.output_0.forward(a_in[this.desc < this.stop]);
a_out[this.desc >= this.stop] = this.output_1.forward(a_in[this.desc >= this.stop]);
return a_out;
# TODO: Implement un_predict
def backward(this, da_out: ndarray) -> ndarray:
da_in = np.zeros((da_out.shape[0], this.output_0.s_in));
da_in[this.desc < this.stop] = this.output_0.backward(da_out[this.desc < this.stop]);
da_in[this.desc >= this.stop] = this.output_1.backward(da_out[this.desc >= this.stop]);
return this.input.backward(da_in);
def create_monitoring_callback(this):
class MBMCMMonitoringCallback(Callback):
def training_start(this_child):
this_child.p = [0, 0, 0, 0];
def consume_cost_features(this_child, cost_s: float, observer_cost_s: float, model: Layer):
avg = np.mean(this.desc);
jdx = int(avg * len(this_child.p));
this_child.p[jdx] += 1;
def training_end(this_child, model: Layer):
print("Parameters studied", this_child.p);
return MBMCMMonitoringCallback();
class LayerGrabber:
def __init__(this):
this.layer = None;
def grab(this, layer: Layer) -> Layer:
this.layer = layer;
return layer;
# Merge (itc, sum) the received a_in, and it's processed
# form using some previous layer.
class BackDependenceLayer(Layer):
def __init__(this, prev: Layer):
if isinstance(prev, Perceptron):
this.prev = Perceptron(prev.s_in, prev.s_out, prev.lr);
this.prev.weights = prev.weights;
this.prev.bias = prev.bias;
this.prev.w = prev.w;
this.prev.b = prev.b;
super().__init__(this.prev.s_in, this.prev.s_in, 0)
def predict(this, a_in: ndarray) -> ndarray:
return a_in + this.prev.predict(a_in);
def forward(this, a_in: ndarray) -> ndarray:
return a_in + this.prev.forward(a_in);
def un_predict(this, da_out: ndarray) -> ndarray:
return da_out + this.prev.un_predict(da_out);
def backward(this, da_out: ndarray) -> ndarray:
return da_out + this.prev.backward(da_out);
class ResNetHelper:
def __init__(this):
this.layer = None;
def grab(this, layer: Layer) -> Layer:
this.layer = layer;
return layer;
class ResidualNetworkLayer(Layer):
def __init__(this, layer: Layer):
super().__init__(layer.s_out, layer.s_out, 0)
this.original_predict = layer.predict;
this.original_forward = layer.forward;
def hijacked_predict(a_in: ndarray) -> ndarray:
this.a_out = this.original_predict(a_in);
return this.a_out;
def hijacked_forward(a_in: ndarray) -> ndarray:
this.a_out = this.original_forward(a_in);
return this.a_out;
layer.predict = hijacked_predict;
layer.forward = hijacked_forward;
this.hijacked_layer = layer;
def predict(this, a_in: ndarray) -> ndarray:
# a_out = a_in + h_a_in
return a_in + this.a_out;
def un_predict(this, da_out: ndarray) -> ndarray:
this.hijacked_layer.un_predict(da_out);
return da_out;
def backward(this, da_out: ndarray) -> ndarray:
# da_in = da_out * 1
# dh_a_in = da_out * 1
this.hijacked_layer.backward(da_out);
return da_out;
class Flatten(Layer):
def __init__(this, s_in: Dims, s_out: int = -1, lr: float = 0):
s_in = Layer.simplify_dims(s_in);
if s_out == -1:
s_out = np.prod(s_in);
super().__init__(s_in, s_out, lr)
def predict(this, a_in: ndarray) -> ndarray:
return a_in.reshape((a_in.shape[0], -1));
def un_predict(this, da_out: ndarray) -> ndarray:
return da_out.reshape(da_out.shape[0:1] + this.s_in);
class ActivationLayer(Layer):
def __init__(this, s_in: Dims, s_out: Dims = None, lr: float = 0):
if s_out is None: s_out = s_in;
super().__init__(s_in, s_out, lr)
def predict(this, a_in: ndarray) -> ndarray:
return this.activate(a_in);
def forward(this, a_in: ndarray) -> ndarray:
this.a_in = a_in;
return this.activate(a_in);
def activate(this, a_in: ndarray) -> ndarray:
raise NotImplementedError();
def un_predict(this, da_out: ndarray) -> ndarray:
# TODO: We dont have any a_in here to multiply -_-
return this.deactivate(da_out);
def backward(this, da_out: ndarray) -> ndarray:
return this.deactivate(this.a_in) * da_out;
def deactivate(this, a_in: ndarray) -> ndarray:
raise NotImplementedError();
class SigmoidLayer(ActivationLayer):
def activate(this, a_in: ndarray) -> ndarray:
return 1 / (1 + np.exp(-a_in));
def deactivate(this, da_out: ndarray) -> ndarray:
# return predict_prime(this.a_in) * da_out
v = np.exp(-da_out);
opv = 1 + v;
return v / (opv * opv);
class TanHLayer(ActivationLayer):
def activate(this, a_in: ndarray) -> ndarray:
return np.tanh(a_in);
def deactivate(this, da_out: ndarray) -> ndarray:
return 1 - np.power(np.tanh(da_out), 2);
class ReluLayer(ActivationLayer):
def activate(this, a_in: ndarray) -> ndarray:
return a_in * (a_in > 0);
def deactivate(this, da_out: ndarray) -> ndarray:
return da_out > 0;
class DivideLayer(ActivationLayer):
def __init__(this, s_in: Dims = -1, s_out: Dims = -1, lr: float = 0, factor=0.5):
super().__init__(s_in, s_out, lr)
this.factor = 0.5;
def activate(this, a_in: ndarray) -> ndarray:
a_in[a_in < this.factor] = 0;
a_in[a_in >= this.factor] = 1;
return a_in;
def deactivate(this, da_out: ndarray) -> ndarray:
return np.zeros(da_out.shape);
class LeakyReluLayer(ActivationLayer):
def activate(this, a_in: ndarray) -> ndarray:
return a_in * (a_in > 0) + this.lr * a_in * (a_in < 0);
def deactivate(this, da_out: ndarray) -> ndarray:
return (da_out > 0) + this.lr * (da_out < 0);
class RecurrentLayer(Layer):
# Expects an input of shape (m, n, o) where one sample of (n, o) represents an input of length n, one_hot_encoded
# with max size o to a matrix (n, o).
# For example, if the data is letters in words, o is 26. For a word like, Happy, n is 5.
# In my implementation, input_height refers to o
ActivationSupplier = Callable[[int], 'ActivationLayer'];
def __init__(this, s_in: Dims, lr: float, output_height: int, hidden_height: int,
hidden_activation: ActivationSupplier = None, output_activation: ActivationSupplier = None,
clip_value: tuple[float, float] = (0, 0)):
s_in = Layer.simplify_dims(s_in);
assert len(s_in) == 2, s_in;
# s_in = (-1, s_in[1]);
this.input_height = s_in[1];
this.hidden_height = hidden_height;
this.output_height = output_height;
factor = np.sqrt(hidden_height);
this.way = np.random.standard_normal((hidden_height, output_height)) / factor;
this.by = np.zeros((1, output_height));
this.wxa = np.random.standard_normal((this.input_height, hidden_height)) / factor;
this.waa = np.random.standard_normal((hidden_height, hidden_height)) / factor;
this.ba = np.zeros((1, hidden_height));
super().__init__(s_in, (s_in[0], output_height), lr);
this.hidden_activation_supplier = hidden_activation if hidden_activation is not None else SigmoidLayer;
this.output_activation_supplier = output_activation if output_activation is not None else SigmoidLayer;
this.hidden_activations = [this.hidden_activation_supplier(s_in[1])];
this.output_activations = [this.output_activation_supplier(hidden_height)];
this.list_of_hidden_activations = [];
this.clip_value = clip_value;
def predict(this, a_in: ndarray) -> ndarray:
raise NotImplementedError()
def forward(this, a_in: ndarray) -> ndarray:
assert a_in.ndim == 3, a_in.shape;
m, n, o = a_in.shape;
this.a_in = a_in;
hidden = np.zeros((m, this.hidden_height));
this.hidden_collection = np.zeros((n + 1, m, this.hidden_height));
# this.hidden_collection[0] = hidden;
this.output = np.zeros((m, n, this.output_height));
while len(this.hidden_activations) < n:
this.hidden_activations.append(this.hidden_activation_supplier(this.input_height));
this.output_activations.append(this.output_activation_supplier(this.hidden_height));
for i in range(n):
g1 = a_in[:, i] @ this.wxa + hidden @ this.waa + this.ba;
hidden = this.hidden_activations[i].forward(g1);
this.hidden_collection[i + 1] = hidden;
g2 = hidden @ this.way + this.by;
output = this.output_activations[i].forward(g2);
this.output[:, i] = output;
return this.output;
def un_predict(this, da_out: ndarray) -> ndarray:
raise NotImplementedError()
def backward(this, da_out: ndarray) -> ndarray:
m, n, p = this.a_in.shape;
dhidden_next = np.zeros_like(this.hidden_collection[0]);
dwxa = np.zeros_like(this.wxa);
dwaa = np.zeros_like(this.waa);
dba = np.zeros_like(this.ba);
dway = np.zeros_like(this.way);
dby = np.zeros_like(this.by);
for i in reversed(range(n)):
dg2 = this.output_activations[i].backward(da_out[:, i]);
hidden = this.hidden_collection[i + 1];
hidden_prev = this.hidden_collection[i];
dway += hidden.T @ dg2 / m;
dby += np.mean(dg2, axis=0, keepdims=True);
dhidden = dg2 @ this.way.T + dhidden_next;
dg1 = this.hidden_activations[i].backward(dhidden);
dba += np.mean(dg1, axis=0, keepdims=True);
dwxa += this.a_in[:, i].T @ dg1 / m;
dwaa += hidden_prev.T @ dg1 / m;
if this.clip_value != (0,0):
for gradient in [dwxa, dwaa, dway, dba, dby]:
np.clip(gradient, -this.clip_value[0], this.clip_value[1], out=gradient);
this.wxa = this.wxa - this.lr * dwxa;
this.waa = this.waa - this.lr * dwaa;
this.ba = this.ba - this.lr * dba;
this.way = this.way - this.lr * dway;
this.by = this.by - this.lr * dby;
# return
def get_parameters(this) -> list[ndarray]:
return [this.wxa, this.waa, this.ba, this.way, this.by];
def set_parameters(this, parameters: list[ndarray]):
this.wxa, this.waa, this.ba, this.way, this.by = \
Layer.set_parameters_helper(parameters, this.wxa, this.waa, this.ba, this.way, this.by);
def are_input_dims_compliant(this, x: ndarray) -> bool:
return x.shape[2:] == this.s_in[1:];
def are_output_dims_compliant(this, y: ndarray) -> bool:
return y.shape[2:] == this.s_out[1:];
class Loss:
def __init__(this):
pass;
# L = loss(y, y`)
def loss(this, y: ndarray, yp: ndarray) -> ndarray:
raise NotImplementedError();
# dL/dy = dloss()
def dloss(this, y: ndarray, yp: ndarray) -> ndarray:
raise NotImplementedError();
def cost(this, y: ndarray, yp: ndarray) -> Tuple[float, ndarray, ndarray]:
l = this.loss(y, yp);
c = np.mean(l, axis=0, keepdims=True);
return (np.mean(c, keepdims=True)[0][0], c, l);
def dcost(this, y: ndarray, yp: ndarray) -> Tuple[float, ndarray, ndarray]:
l = this.dloss(y, yp);
c = np.mean(l, axis=0, keepdims=True);
return (np.mean(c, keepdims=True)[0][0], c, l);
# Mean Absolute Error
class MAELoss(Loss):
def loss(this, y: ndarray, yp: ndarray) -> ndarray:
return np.abs(y - yp);
def dloss(this, y: ndarray, yp: ndarray) -> ndarray:
return - np.sign(y - yp);
# Mean Squared Error
class MSELoss(Loss):
def loss(this, y: ndarray, yp: ndarray) -> ndarray:
d = y - yp;
return d * d;
def dloss(this, y: ndarray, yp: ndarray) -> ndarray:
return - 2 * (y - yp);
# Binary Cross Entropy
class BCELoss(Loss):
def loss(this, y: ndarray, yp: ndarray) -> ndarray:
return - (y * np.log(yp) + (1 - y) * np.log(1 - yp));
def dloss(this, y: ndarray, yp: ndarray) -> ndarray:
return - (y / yp - (1 - y) / (1 - yp));
# Mean Squared Logarithmic Error
class MSLELoss(Loss):
def loss(this, y: ndarray, yp: ndarray) -> ndarray:
return np.power(np.log(y + 1) - np.log(yp + 1), 2);
def dloss(this, y: ndarray, yp: ndarray) -> ndarray:
return -2 * (np.log(1 + y) - np.log(1 + yp)) / (1 + yp);
# 👀
class SimpleLoss(Loss):
def loss(this, y: ndarray, yp: ndarray) -> ndarray:
return (y != (yp > 0.5)).astype(float);
def dloss(this, y: ndarray, yp: ndarray) -> ndarray:
return np.ones(y.shape);
# Entropy
class EntropyLoss(Loss):
def __init__(this):
super().__init__()
this.helper = BCELoss();
def loss(this, y: ndarray, yp: ndarray) -> ndarray:
indices = yp[yp <= 0 or yp >= 1];
yp[indices] = 0;
yp[~indices] = this.helper.loss(y, yp);
return yp;
def dloss(this, y: ndarray, yp: ndarray) -> ndarray:
indices = yp[yp <= 0 or yp >= 1];
yp[indices] = 0;
yp[~indices] = this.helper.dloss(y, yp);
return yp;
# @jit(nopython=True)
class ConfusionMatrix:
def __init__(this, y: ndarray, yp: ndarray):
assert y.shape == yp.shape and len(y.shape) == 2;
this.m, this.c = y.shape[0], y.shape[1];
y, yp = y.argmax(axis=1), yp.argmax(axis=1);
this.cm = np.zeros((this.c, this.c), int);
for i in range(this.m):
this.cm[y[i], yp[i]] += 1;
this.pred_sums = np.sum(this.cm, axis=1);
this.actl_sums = np.sum(this.cm, axis=0);
this.total = this.m;
this.true_values = np.diagonal(this.cm);
this.trues = np.sum(this.true_values);
# this.fn = np.sum(np.triu(this.cm, 1));
# this.fp = np.sum(np.tril(this.cm, -1));
print("Accuracy =", this.trues / this.total * 100);
def print_matrix(this):
row_width = 2 + len("%d" % this.total);
print("Predicted ", end="");
print(*["%%-%dd" % row_width % x for x in range(this.c)]);
print("Actual");
for i in range(this.c):
print(" %2d " % i, *["%%-%dd" % row_width % x for x in this.cm[i]], " %5d" % this.pred_sums[i]);
print(" ", *["%%-%dd" % row_width % x for x in this.actl_sums], " %5d" % this.total);
class Callback:
def __init__(this):
pass;
def training_start(this):
pass;
def consume_cost_features(this, cost_s: float, observer_cost_s: float, model: Layer):
pass;
def training_end(this, model: Layer):
pass;
def is_training_breakable(this, cost_s: float, observer_cost_s: float, model: Layer) -> bool:
return False;
def reset(this):
pass;
class CostRecorder(Callback):
def __init__(this):
super().__init__();
this.costs = [];
this.observer_costs = [];
this.current_costs = [];
this.current_observer_costs = [];
def training_start(this):
this.current_costs = [];
this.current_observer_costs = [];
def consume_cost_features(this, cost_s: float, observer_cost_s, model: Layer):
this.costs.append(cost_s);
this.observer_costs.append(observer_cost_s);
this.current_costs.append(cost_s);
this.current_observer_costs.append(observer_cost_s);
def reset(this):
this.costs.clear();
this.observer_costs.clear();
this.current_costs.clear();
this.current_observer_costs.clear();
class PlotScalarCost(CostRecorder):
def __init__(this, print_stats_to_terminal: bool = True, register_training_end_callback:bool =True):
super().__init__();
this.plotLines = [];
this.print_stats_to_terminal = print_stats_to_terminal;
this.register_training_end_callback = register_training_end_callback;
def reset(this):
super().reset();
this.plotLines = [];
def training_end(this, model: Layer):
if this.register_training_end_callback:
this.__training_end__(model);
def __training_end__(this, model: Layer):
if len(this.current_costs) > 0:
if this.print_stats_to_terminal:
print("Costs rec'ed", len(this.current_costs));
print("Minimum Cost", min(this.current_costs));
print("Last Cost ", this.current_costs[-1]);
plt.subplot(2, 1, 1);
plt.title("Current Cost")
plt.plot(this.costs);
plt.subplot(2, 1, 2);
plt.title("Observing Cost")
plt.plot(this.observer_costs);
plt.show();
else:
print("No Cost Data To Plot...")
class PlotTestDataToo(CostRecorder):
def __init__(this, x_test: ndarray, y_test: ndarray, loss: Loss, observer_loss: Loss = None):
super().__init__();
this.testRecorder = CostRecorder();
this.x_test = x_test;
this.y_test = y_test;
this.loss = loss;
this.observer_loss = observer_loss;
this.plotLines = [];
def training_start(this):
super().training_start();
this.testRecorder.training_start();
def consume_cost_features(this, cost_s: float, observer_cost_s, model: Layer):
super().consume_cost_features(cost_s, observer_cost_s, model);
cost_s = this.loss.cost(this.y_test, model.predict(this.x_test))[0];
if this.observer_loss is not None:
observer_cost_s = this.observer_loss.cost(this.y_test, model.predict(this.x_test))[0];
else:
observer_cost_s = cost_s;
this.testRecorder.consume_cost_features(cost_s, observer_cost_s, model);
def reset(this):
super().reset();
this.testRecorder.reset();
this.plotLines = [];
def training_end(this, model: Layer):
if len(this.current_costs) > 0:
print("Costs rec'ed", len(this.current_costs));
print("Minimum Cost", min(this.current_costs));
print("Last Cost ", this.current_costs[-1]);
# figsize = (8, 11);
plt.subplot(3, 1, 1);
plt.title("Current Cost")
plt.plot(this.costs, color='blue');
for line in this.plotLines:
plt.axvline(line, color='r');
plt.subplot(3, 1, 2);
plt.title("Current Cost")
plt.plot(this.costs, color='blue');
plt.plot(this.testRecorder.costs, color='orange');
for line in this.plotLines:
plt.axvline(line, color='r');
plt.subplot(3, 1, 3);
plt.title("Observing Cost")
plt.plot(this.observer_costs, color='blue');
plt.plot(this.testRecorder.observer_costs, color='orange');
for line in this.plotLines:
plt.axvline(line, color='r');
plt.show();
else:
print("No Cost Data To Plot...")
class AutomaticDecayingLearningRate(Callback):
def __init__(this, costRecorder: CostRecorder, fractur: float):
super().__init__();
this.costRecorder = costRecorder;
assert fractur < 1;
this.fractur = fractur;
def reset(this):
super().reset()
this.costRecorder.reset();
def consume_cost_features(this, cost_s: float, observer_cost_s, model: Layer):
if len(this.costRecorder.costs) < 2:
return;
last_cost = this.costRecorder.costs[-2];
if last_cost < cost_s and abs(model.lr) > 1e-30:
model.set_lr(model.lr * this.fractur);
if isinstance(this.costRecorder, PlotScalarCost) or isinstance(this.costRecorder, PlotTestDataToo):
this.costRecorder.plotLines.append(len(this.costRecorder.costs) - 1);
# plt.axvline(len(this.costRecorder.costs), color='r');
class PreventIncreaseInCost(Callback):
def __init__(this, costRecorder: CostRecorder):
super().__init__();
this.costRecorder = costRecorder;
def is_training_breakable(this, cost_s: float, observer_cost_s, model: Layer) -> bool:
last_cost = this.costRecorder.costs[-1];
if last_cost == cost_s and len(this.costRecorder.costs) > 2:
last_cost = this.costRecorder.costs[-2];
return cost_s > last_cost;
def one_hot_encode(inp: ndarray, size: int = -1) -> ndarray:
assert len(inp.shape) <= 2;
inp = inp.astype(int);
if size == -1: size = np.max(inp) + 1;
res = np.zeros((inp.shape[0], size));
for i in range(inp.shape[0]):
res[i, inp[i]] = 1;
return res;
def one_hot_encode_list(inp: list[int], size: int = -1) -> list[list[int]]:
if size == -1:
size = np.max(inp) + 1;
res = [[0] * size for _ in range(len(inp))];
for i, v in enumerate(inp):
res[i][v] = 1;
return res;
# NOT INTENDED TO ME AN EXACT de_one_hot, just a bodge
def decode(c: int, i: int) -> ndarray:
a = np.zeros((1, c));
a[0][i] = 1;
return a;
def file_name(name: str) -> str:
try:
return name[name.rindex('\\') + 1:name.rindex('.')];
except:
return "debug_session";
def backprop_random_label(model: Layer):
label = np.zeros((np.product(model.s_out),));
i = np.random.randint(label.shape[0]);
print('using', i);
label[i] = 1;
label = label.reshape((1,) + model.s_out);
sample = model.un_predict(label)[0];
# if sample.ndim != 2:
# print("Terra-transforming", sample.shape, end=" ");
# sample = sample.reshape((sample.shape[0], -1));
# print("to", sample.shape);
plt.imshow(sample);
plt.title(i);
plt.show();
class Trainer:
def __init__(this, callbacks: list[Callback] = None):
if callbacks == None:
callbacks = [];
this.callbacks = callbacks;
def pick_a_nice_random_model(this, create_model: Callable[[], Layer],
loss: Loss,
bayes_error: float = 1,
max_iterations: float = 100_000) -> Layer:
iterations = 0;
model_with_least_cost_so_far = model = create_model();
least_cost_so_far = cost = loss.cost(y, model.forward(x))[0];
while True:
if cost < least_cost_so_far:
model_with_least_cost_so_far = model;
iterations += 1;
if iterations % 100000 == 0:
print(iterations, "models tested so far.");
if cost > bayes_error and iterations < max_iterations:
model = create_model();
cost = loss.cost(y, model.forward(x))[0];
continue;
else:
break;
print(iterations, "models tested so far.");
return model_with_least_cost_so_far;
def pick_a_nice_random_model_2(this, create_model: Callable[[], Layer], x: ndarray, y: ndarray,
loss: Loss, observer_loss: Loss = None,
bayes_error: float = 1,
max_search_iterations: int = 1_000,
max_train_iterations: int = 1_000) -> Layer:
models = [];
for i in range(100):
model = this.pick_a_nice_random_model(create_model, loss, bayes_error, max_search_iterations);
cost = loss.cost(y, model.forward(x))[0];
models.append([cost, model]);
def comparator(i1, i2):
i1, i2 = i1[0], i2[0];
return i1 - i2;
# models.sort(key=comparator);
callbacks = this.callbacks;
this.callbacks = [];
for item in models:
this.overfit(item[1], x, y, loss, observer_loss, bayes_error, max_train_iterations);
cost = loss.cost(y, item[1].forward(x))[0];
if cost > item[0]:
item[0] = -1;
continue;
else:
item[0] = cost;
print("Final Cost", item[0]);
this.callbacks = callbacks;
models = [item for item in models if item[0] != -1];
if len(models) <= 0:
print("lol");
exit(0);
return min(models, key=lambda i: i[0])[1];
def overfit(this, model: Layer, x: ndarray, y: ndarray,
loss: Loss, observer_loss: Loss = None,
bayes_error: float = 1, max_iterations: int = 10_000,
batch_offset=0,
print_stats_to_terminal:bool=False, notify_with_beep: bool = True
):
if model.are_input_dims_compliant(x):
x = x.reshape((1,) + x.shape);
if model.are_output_dims_compliant(y):
y = y.reshape((1,) + y.shape);
assert model.are_input_dims_compliant(x[0]);
assert model.are_output_dims_compliant(y[0]);
if observer_loss == None:
observer_loss = loss;
total_batches = x.shape[0];
# batch_index = batch_offset % total_batches;
batch_index = 0;
iterations = batch_offset;
max_iterations += batch_offset;
epoch = 0;
for callback in this.callbacks:
callback.training_start();
yd = model.forward(x[batch_index]);
(cost_s, cost_v, loss_v) = loss.cost(y[batch_index], yd);
(observer_cost_s, _, _) = observer_loss.cost(y[batch_index], yd);
if print_stats_to_terminal:
print("Initial Cost", cost_s)
for callback in this.callbacks:
callback.consume_cost_features(cost_s, observer_cost_s, model);
if callback.is_training_breakable(cost_s, observer_cost_s, model):
print("Training stopped due to callback!")
break;
while cost_s > bayes_error and iterations < max_iterations:
model.backward(loss.dcost(y[batch_index], yd)[2]);
iterations += 1;
batch_index = iterations % total_batches;
if batch_index == 0: epoch += 1;
yd = model.forward(x[batch_index]);
(cost_s, cost_v, loss_v) = loss.cost(y[batch_index], yd);
(observer_cost_s, _, _) = observer_loss.cost(y[batch_index], yd);
if np.max([np.max(np.abs(parameter)) for parameter in model.get_parameters()]) > 1e6:
print("Heading towards explosion 👀...");
break;
for callback in this.callbacks:
callback.consume_cost_features(cost_s, observer_cost_s, model);
if callback.is_training_breakable(cost_s, observer_cost_s, model):
print("Training stopped due to callback!")
break;
else:
continue;
break;
for callback in this.callbacks:
callback.training_end(model);
if notify_with_beep:
beepy.beep("coin");
def start_observatory(this, model: Layer, x: ndarray, y: ndarray,
loss: Loss, observer_loss: Loss, create_model,
bayes_error: float = 1, max_iterations: int = 10_000,
custom_commands: dict[str, Callable[[Layer], None]] = None,
file='parameters'
) -> Layer:
# Configurables
# lr - model intrinsic
iter_per_loop = max_iterations;
batch_size = 64;
file = '%s.npz' % file;
# Machine States
iterations = 0;
x_batches = get_windows_for_mini_batch(x, size_of_batches=batch_size);
y_batches = get_windows_for_mini_batch(y, size_of_batches=batch_size);
parameters_backup = model.get_parameters();
while True:
try:
command = input("Observatory: Choose an action: ");
if command == 'h':
print("Valid commands so far are h, l, i, b, p, pn, q, qn, r, n, c, t, x");
elif command == 'l':
print("Set learning rate FACTOR (current lr is", model.lr, "): ");
new_lr = model.lr * float(input(">>> "));
model.set_lr(new_lr);
print("New lr is", new_lr);
elif command == 'i':
print("Set iterations per loop (currently", iter_per_loop, "): ");
iter_per_loop = int(input(">>> "));
elif command == 'b':
print("Set mini batch size (currently", batch_size, "): ");
batch_size = int(input(">>> "));
if batch_size == -1:
batch_size = x.shape[0];
x_batches = get_windows_for_mini_batch(x, size_of_batches=batch_size);
y_batches = get_windows_for_mini_batch(y, size_of_batches=batch_size);
elif command == 'p':
print("Saving Parameters to", file, "file");
np.savez(file, *model.get_parameters());
elif command == 'pn':
name = "%s.npz" % input("Please enter a name: ");
print("Saving Parameters to %s file." % name);
np.savez(name, *model.get_parameters());
elif command == 'q':
print("Loading Parameters from", file, "file");
parameters = np.load(file);
model.set_parameters([parameters[key] for key in parameters]);
elif command == 'qn':
name = "%s.npz" % input("Please enter a name: ");
print("Loading Parameters from %s file." % name);
parameters = np.load(name);
model.set_parameters([parameters[key] for key in parameters]);
elif command == 'r':
print("Resetting to last learnt parameters.");
model.set_parameters(parameters_backup);
# [cb.reset() for cb in this.callbacks]
elif command == 'n':
model = create_model();
# [cb.reset() for cb in this.callbacks]
elif command == 'c':
[cb.reset() for cb in this.callbacks]
elif command == 't':
parameters_backup = model.get_parameters();
this.overfit(model, x_batches, y_batches, loss, observer_loss, bayes_error=bayes_error,
max_iterations=iter_per_loop, batch_offset=iterations);
iterations += iter_per_loop;
elif command == 'x':
break;
elif command in custom_commands:
custom_commands[command](model);
except Exception as e:
print("You probably made a typo 👀...");
print(traceback.print_exc());
continue;
return model;
def normalize_data(v: ndarray) -> ndarray:
v -= np.mean(v, axis=0, keepdims=True);
s = np.std(v, axis=0, keepdims=True);
s[s <= 0.01] = 1;
v /= s;
return v;
if __name__ == '__main__':
x = np.array([
[0, 0, 0, 0],
[0, 0, 0, 1],
[0, 0, 1, 0],
[0, 0, 1, 1],
[0, 1, 0, 0],
[0, 1, 0, 1],
[0, 1, 1, 0],
[0, 1, 1, 1],
[1, 0, 0, 0],
[1, 0, 0, 1]
], dtype=float);
y = np.array([
[1, 1, 1, 0, 1, 1, 0],
[0, 0, 1, 0, 0, 1, 0],
[1, 0, 1, 1, 1, 1, 0],
[1, 0, 1, 1, 0, 1, 1],
[0, 1, 1, 1, 0, 1, 0],
[1, 1, 0, 1, 0, 1, 1],
[1, 1, 0, 1, 1, 1, 1],
[1, 1, 0, 0, 1, 0, 0],
[1, 1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 0, 1, 0]
], dtype=float);
# x = np.array([[1, 1], [1, 0], [0, 1], [0, 0]], dtype=float);
# y = np.array([[1, 1], [0, 1], [0, 1], [0, 0]]);
x = normalize_data(x);
# loss = MAELoss();
# loss = MSELoss();
# loss = BCELoss();
loss = MSLELoss();
observer_loss = SimpleLoss();
def create_model():
model = MultiLayeredModel \
([x.shape[1], 4, y.shape[1]],
0.00001,
lambda s_in, s_out, lr: (Perceptron(s_in, s_out, lr)));
return model;
costRecorder = PlotScalarCost();
# costRecorder = PlotTestDataToo(np.array([[1, 1], [0, 1]]), np.array([[1, 1], [0, 1]]), loss, observer_loss);
trainer = Trainer([
costRecorder
# , AutomaticDecayingLearningRate(costRecorder, 0.5)
# , PreventIncreaseInCost(callback)
]);
model = create_model();
# model = trainer.pick_a_nice_random_model(create_model, loss, bayes_error=0.3);
# model = trainer.pick_a_nice_random_model_2(create_model, x, y, loss, observer_loss, bayes_error=0.3);
def my_custom_action(m: Layer):
print(m.lr);
yp = np.where(m.forward(x) < 0.5, 0, 1);
print(yp);
yp = np.where(yp != y, 0, 1);
print(yp);
print(np.sum(1 - yp));
# trainer.overfit(model, x, y, loss, bayes_error=0, max_iterations=5000);
model = trainer.start_observatory(model, x, y, loss, observer_loss, create_model, bayes_error=0,
custom_commands={
'cm': lambda m: ConfusionMatrix(y, m.forward(x)).print_matrix(),
'mca': my_custom_action
}
);
my_custom_action(model);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment