Last active
July 2, 2022 17:16
-
-
Save Minecraftian14/58c39a800af8d5c330005d21600fdf82 to your computer and use it in GitHub Desktop.
Implementation of some common DL algorithms in Python.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import traceback | |
from numbers import Number | |
from typing import Union, Callable, Tuple | |
import beepy | |
import matplotlib.pyplot as plt | |
import numpy as np | |
from numpy import ndarray | |
DimsPrim = int; | |
Dims1D = Tuple[int]; | |
Dims2D = Tuple[int, int]; | |
Dims3D = Tuple[int, int, int]; | |
Dims4D = Tuple[int, int, int, int]; | |
Dims5D = Tuple[int, int, int, int, int]; | |
DimsND = Union[Dims1D, Dims2D, Dims3D, Dims4D, Dims5D]; | |
Dims = Union[DimsPrim, DimsND, 'Layer']; | |
def get_windows_for_pooling(A: ndarray, window_shape: Dims2D) -> ndarray: | |
assert len(A.shape) == 4, A.shape; | |
m, h, w, c = A.shape; | |
fh, fw = window_shape; | |
assert h % fh == 0 and w % fw == 0; | |
oh, ow = h // fh, w // fw; | |
assert oh > 0 and ow > 0; | |
mI, mJ, mK, mL = A.strides; | |
strides = (mI, fh * mJ, fw * mK, mJ, mK, mL); | |
windows = np.lib.stride_tricks.as_strided(A, (m, oh, ow, fh, fw, c), strides); | |
return windows; | |
def get_windows_for_mini_batch(A: ndarray, number_of_batches=-1, size_of_batches=-1) -> ndarray: | |
assert number_of_batches * size_of_batches != 1, "PLease provide value for either one!"; | |
m = A.shape[0]; | |
shape = A.shape[1:]; | |
strides = A.strides; | |
if number_of_batches == -1: | |
# assert m % size_of_batches == 0; | |
number_of_batches = m // size_of_batches; | |
elif size_of_batches == -1: | |
# assert m % number_of_batches == 0: | |
size_of_batches = m // number_of_batches; | |
# strides = (A.dtype.alignment * np.product(shape),) + strides; | |
# shape = (size_of_batches,) + shape; | |
# strides = (A.dtype.alignment * np.product(shape),) + strides; | |
# shape = (number_of_batches,) + shape; | |
return np.lib.stride_tricks.as_strided(A, (number_of_batches, size_of_batches) + shape, | |
(A.dtype.alignment * size_of_batches * np.product(shape),) + strides); | |
class Layer: | |
@staticmethod | |
def simplify_dims(value: Dims) -> DimsND: | |
if isinstance(value, Number): | |
value = (value,) | |
elif isinstance(value, Layer): | |
value = value.s_out; | |
return value; | |
@staticmethod | |
def set_parameters_helper(parameters, *args): | |
return args; | |
def __init__(this, s_in: Dims, s_out: Dims, lr: float): | |
this.s_in = Layer.simplify_dims(s_in); | |
this.s_out = Layer.simplify_dims(s_out); | |
this.lr = lr; | |
def predict(this, a_in: ndarray) -> ndarray: | |
raise NotImplementedError(); | |
def forward(this, a_in: ndarray) -> ndarray: | |
return this.predict(a_in); | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
raise NotImplementedError(); | |
def backward(this, da_out: ndarray) -> ndarray: | |
return this.un_predict(da_out); | |
def get_parameters(this) -> list[ndarray]: | |
return []; | |
def set_parameters(this, parameters: list[ndarray]): | |
pass; | |
def set_lr(this, lr): | |
this.lr = lr; | |
def are_input_dims_compliant(this, x: ndarray) -> bool: | |
return x.shape[1:] == this.s_in; | |
def are_output_dims_compliant(this, y: ndarray) -> bool: | |
return y.shape[1:] == this.s_out; | |
# Just a dummy layer, so that I won't have to change layer[-1] with n everytime. | |
class InputLayer(Layer): | |
def __init__(this, s_in: Dims): | |
super().__init__(s_in, s_in, 0) | |
def predict(this, a_in: ndarray) -> ndarray: | |
return a_in; | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
return da_out; | |
class AdditiveParametersLayer(Layer): | |
def __init__(this, s_in: Dims, lr: float): | |
super().__init__(s_in, s_in, lr); | |
this.b = np.random.random((1,) + this.s_out) - 0.5; | |
def predict(this, a_in: ndarray) -> ndarray: | |
assert a_in.shape[1:] == this.s_in; | |
return a_in + this.b; | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
assert da_out.shape[1:] == this.s_out; | |
return da_out; | |
def backward(this, da_out: ndarray) -> ndarray: | |
assert da_out.shape[1:] == this.s_out; | |
db = np.mean(da_out, axis=0, keepdims=True); | |
this.b -= this.lr * db; | |
return this.un_predict(da_out); | |
def get_parameters(this) -> list[ndarray]: | |
return [this.b]; | |
def set_parameters(this, parameters: list[ndarray]): | |
assert this.b.shape == parameters[0].shape; | |
this.b = parameters[0]; | |
class MultiplicativeParametersLayer(Layer): | |
LinearDims = Union[DimsPrim, Dims1D]; | |
def __init__(this, s_in: LinearDims, s_out: LinearDims, lr: float): | |
super().__init__(s_in, s_out, lr); | |
s_in = this.s_in[0]; | |
s_out = this.s_out[0]; | |
this.w = np.random.standard_normal((s_in, s_out)) * np.sqrt(2 / s_in); | |
def predict(this, a_in: ndarray) -> ndarray: | |
assert a_in.shape[1:] == this.s_in, a_in.shape; | |
return a_in @ this.w; | |
def forward(this, a_in: ndarray) -> ndarray: | |
this.a_in = a_in; | |
return this.predict(a_in) | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
assert da_out.shape[1:] == this.s_out; | |
da_in = da_out @ this.w.T; | |
return da_in; | |
def backward(this, da_out: ndarray) -> ndarray: | |
assert da_out.shape[1:] == this.s_out; | |
dw = (this.a_in.T @ da_out) / this.a_in.shape[0]; | |
this.w = this.w - this.lr * dw; | |
return this.un_predict(da_out); | |
def get_parameters(this) -> list[ndarray]: | |
return [this.w]; | |
def set_parameters(this, parameters: list[ndarray]): | |
assert this.w.shape == parameters[0].shape; | |
this.w = parameters[0]; | |
class SuperMultiplicativeParametersLayer(Layer): | |
def __init__(this, s_in: Dims, lr: float, weights_height: int): | |
super().__init__(s_in, weights_height, lr) | |
contract = "".join([chr(x + 97) for x in range(len(this.s_in))]); | |
assert 'y' not in contract; | |
# y stands for m or #samples, z stands for c or classes. | |
this.forward_operation = "y%s,%sz->yz" % (contract, contract); | |
this.un_predict_operation = "yz,z%s->y%s" % (contract[::-1], contract); | |
this.backward_operation = "%sy,yz->%sz" % (contract[::-1], contract); | |
this.w = np.random.standard_normal(this.s_in + this.s_out) * np.sqrt(2 / np.product(this.s_in)); | |
def predict(this, a_in: ndarray) -> ndarray: | |
assert a_in.shape[1:] == this.s_in; | |
return np.einsum(this.forward_operation, a_in, this.w); | |
def forward(this, a_in: ndarray) -> ndarray: | |
this.a_in = a_in; | |
return this.predict(a_in) | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
assert da_out.shape[1:] == this.s_out; | |
da_in = np.einsum(this.un_predict_operation, da_out, this.w.T); | |
return da_in; | |
def backward(this, da_out: ndarray) -> ndarray: | |
assert da_out.shape[1:] == this.s_out; | |
dw = np.einsum(this.backward_operation, this.a_in.T, da_out) / this.a_in.shape[0]; | |
this.w = this.w - this.lr * dw; | |
return this.un_predict(da_out); | |
def get_parameters(this) -> list[ndarray]: | |
return [this.w]; | |
def set_parameters(this, parameters: list[ndarray]): | |
assert this.w.shape == parameters[0].shape; | |
this.w = parameters[0]; | |
class ConvolutiveParametersLayer(Layer): | |
# If forward is a(l) = w(l) (*) a(l-1) | |
# then backward goes like | |
# del(l-1) = Σ<k=1,n> del(l,k) (*)f Rot<180>(w(l)) * g'(z(l-1)) | |
# df(l,k) = a(l-1) (*) d(l,k) | |
# df(l,k) = a(l-1) (*) d(l,k) | |
@staticmethod | |
def get_windows_for_a_fully_featured_convolution( | |
a: ndarray, shape: Dims2D, strides: Dims2D = (1, 1)) -> ndarray: | |
""" | |
:param a: A 4D array of shape [m, h, w, c] where m is #samples, h is height of image, w is width of image and c is #channels in image. | |
:param shape: A 2D array of shape [fH, fW] where fH is filter height and fW is filter width. | |
:param strides: A 2D array of shape [sH, sW] where sH is the strides along the height and sW is the strides along the width. | |
:return: A 6D array of shape [m, fR, fC, fH, fW, c] where fR is number of windows row wise and fC is number of windows column wise. | |
""" | |
assert len(a.shape) == 4; | |
m, h, w, c = a.shape; | |
fH, fW = shape; | |
sH, sW = strides; | |
fR, fC = h // sH - fH + 1, w // sW - fW + 1; | |
assert fR > 0 and fW > 0, "Can't make windows of size [%d, %d] for an image of size [%d, %d]." % (fH, fW, h, w); | |
shape = (m, fR, fC, fH, fW, c); | |
mI, mJ, mK, mL = a.strides; | |
strides = (mI, mJ * sH, mK * sW, mJ, mK, mL); | |
return np.lib.stride_tricks.as_strided(a, shape, strides); | |
@staticmethod | |
def get_output(i: Dims2D, f: Dims2D, p: Dims2D = (0, 0), s: Dims2D = (1, 1)) -> Dims2D: | |
h, w = i; | |
fh, fw = f; | |
ph, pw = p; | |
sh, sw = s; | |
return (h + 2 * ph) // sh - fh + 1, (w + 2 * pw) // sw - fw + 1; | |
@staticmethod | |
def get_pads(i: Dims2D, f: Dims2D, s: Dims2D = (1, 1), o: Dims2D = None) -> Dims2D: | |
if o is None: o = i; | |
h, w = i; | |
fh, fw = f; | |
sh, sw = s; | |
oh, ow = o; | |
return ((oh + fh - 1) * sh - h) // 2, ((ow + fw - 1) * sw - w) // 2; | |
def __init__(this, s_in: Dims, lr: float, | |
filter_shape: Dims2D, number_of_filters: int = 1, | |
padding: Dims2D = (0, 0), striding: Dims2D = (1, 1)): | |
s_in = Layer.simplify_dims(s_in); | |
assert len(s_in) == 3; | |
h, w, c = s_in; | |
fh, fw = filter_shape; | |
fn = number_of_filters; | |
ph, pw = padding; | |
sh, sw = striding; | |
oh, ow = ConvolutiveParametersLayer.get_output((h, w), filter_shape, padding, striding); | |
s_out = (oh, ow, fn); | |
super().__init__(s_in, s_out, lr); | |
this.filter_shape = (fh, fw, c, fn); | |
this.filter_weight = np.product(s_out); | |
this.h, this.w, this.c, this.fh, this.fw, this.fn, this.ph, this.pw, this.sh, this.sw, this.oh, this.ow = \ | |
h, w, c, fh, fw, fn, ph, pw, sh, sw, oh, ow; | |
this.dph, this.dpw = ConvolutiveParametersLayer.get_pads((oh, ow), filter_shape, striding); | |
this.do_we_pad = ph != 0 and pw != 0; | |
# this.f = np.random.standard_normal(this.filter_shape) * np.sqrt(2 / h / w / c); | |
this.f = np.random.standard_normal(this.filter_shape) * np.sqrt(2 / h / w / c); | |
def convolve(this, a: ndarray, b: ndarray): | |
assert a.ndim == b.ndim == 4; | |
m = a.shape[0]; | |
h, w, fh, fw, ph, pw, sh, sw = this.h, this.w, this.fh, this.fw, this.ph, this.pw, this.sh, this.sw; | |
a = np.pad(a, ((0, 0), (ph, ph), (pw, pw), (0, 0))); | |
windows = ConvolutiveParametersLayer.get_windows_for_a_fully_featured_convolution(a, (fh, fw), (sh, sw)); | |
if b.shape[0] == m: # b is da_out | |
return np.einsum('mhwfgc,mhwn->mfgcn', windows, b, optimize=True); | |
# return np.sum(windows[:, :, :, :, :, :, None] * b[:, :, :, None, None, None, :], axis=(1, 2)); | |
else: | |
return np.einsum('mhwfgc,fgcn->mhwn', windows, b, optimize=True); | |
# return np.sum(windows[:, :, :, :, :, :, None] * b[None, None, None, :, :, :, :], axis=(3, 4, 5)); | |
# No not really, it's npt exactly what you would call an inverse, mathematically. | |
def convolve_inverse(this, o: ndarray, b: ndarray): | |
assert o.ndim == b.ndim == 4; | |
m = o.shape[0]; | |
h, w, c, fh, fw, ph, pw, sh, sw, oh, ow = this.h, this.w, this.c, this.fh, this.fw, this.ph, this.pw, this.sh, this.sw, this.oh, this.ow; | |
a = np.ones((m, ph + h + ph, pw + w + pw, c)); | |
windows = ConvolutiveParametersLayer.get_windows_for_a_fully_featured_convolution(a, (oh, ow), (sh, sw)); | |
windows += np.einsum('mhwfgc,mfgn,hwcn->mhwfgc', windows, o, b, optimize=True) - 1; | |
# windows += (windows[:, :, :, :, :, :, None] * | |
# o[:, None, None, :, :, None, :] * | |
# b[None, :, :, None, None, :, :]) - 1; | |
return a; | |
def predict(this, a_in: ndarray) -> ndarray: | |
assert a_in.shape[1:] == this.s_in, a_in.shape; | |
return this.convolve(a_in, this.f); | |
def forward(this, a_in: ndarray) -> ndarray: | |
this.a_in = a_in; | |
return this.convolve(a_in, this.f); | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
assert da_out.shape[1:] == this.s_out; | |
return this.convolve_inverse(da_out, this.f); | |
def backward(this, da_out: ndarray) -> ndarray: | |
df = np.mean(this.convolve(this.a_in, da_out), axis=0) / this.filter_weight; | |
this.f -= this.lr * df; | |
return this.un_predict(da_out); | |
def get_parameters(this) -> list[ndarray]: | |
return [this.f]; | |
def set_parameters(this, parameters: list[ndarray]): | |
assert this.f.shape == parameters[0].shape; | |
this.f = parameters[0]; | |
class DeprecatedMaxPoolingLayer(Layer): | |
def __init__(this, s_in: Dims, filter_size: Dims2D, lr: float = 0): | |
s_in = Layer.simplify_dims(s_in); | |
assert np.sum(np.mod(s_in, (filter_size + (1,)))) == 0; | |
super().__init__(s_in, tuple(np.divide(s_in, (filter_size + (1,))).astype(int)), lr) | |
this.filter_size = filter_size; | |
def predict(this, a_in: ndarray) -> ndarray: | |
assert a_in.shape[1:] == this.s_in; | |
windows = get_windows_for_pooling(a_in, this.filter_size); | |
return np.max(windows, axis=(3, 4)); | |
def forward(this, a_in: ndarray) -> ndarray: | |
assert a_in.shape[1:] == this.s_in; | |
windows = get_windows_for_pooling(a_in, this.filter_size); | |
a_out = np.max(windows, axis=(3, 4), keepdims=True); | |
this.indices = windows == a_out; | |
# this.indices = np.where(windows == a_out, 1, 0); | |
return a_out.squeeze((3, 4)); | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
da_in = np.zeros((da_out.shape[0],) + this.s_in); | |
da_in[:, 0:this.s_in[0]:this.filter_size[0], 0:this.s_in[1]:this.filter_size[1]] = da_out; | |
da_in[:, 1:this.s_in[0]:this.filter_size[0], 0:this.s_in[1]:this.filter_size[1]] = da_out; | |
da_in[:, 0:this.s_in[0]:this.filter_size[0], 1:this.s_in[1]:this.filter_size[1]] = da_out; | |
da_in[:, 1:this.s_in[0]:this.filter_size[0], 1:this.s_in[1]:this.filter_size[1]] = da_out; | |
return da_in; | |
def backward(this, da_out: ndarray) -> ndarray: | |
da_in = np.zeros((da_out.shape[0],) + this.s_in); | |
m, oh, ow, fh, fw, c = np.where(this.indices == 1); | |
# m, oh, ow, fh, fw, c = this.indices; | |
rb, cb = oh * this.filter_size[0] + fh, ow * this.filter_size[1] + fw; | |
da_in[m, rb, cb, c] = da_out[m, oh, ow, c].flatten(); | |
return da_in; | |
# ... Please make separate layers for padding and striding ...? | |
class ValueUpdaters: | |
def __init__(this, alpha: float, beta: float): | |
this.alpha = alpha; | |
this.beta = beta; | |
def update(this, v_old: ndarray, dv_incoming: ndarray) -> ndarray: | |
raise NotImplementedError(); | |
class ValueWithBiasFixing(ValueUpdaters): | |
def __init__(this, beta: float): | |
super().__init__(0, beta) | |
this.time = 1; | |
def update(this, v_old, dv_incoming): | |
v_new = v_old / (1 - np.power(this.beta, this.time)); | |
this.time += 1; | |
return v_new; | |
class ValueWithMomentum(ValueUpdaters): | |
def __init__(this, alpha: float, beta: float = 0.9): | |
super().__init__(alpha, beta) | |
this.vdv = 0; | |
def calculatVDV(this, v_old, dv_incoming): | |
return this.beta * this.vdv + (1 - this.beta) * dv_incoming; | |
def update(this, v_old, dv_incoming): | |
this.vdv = this.calculatVDV(v_old, dv_incoming); | |
v_new = v_old - this.alpha * this.vdv; | |
return v_new; | |
class BiasFixedValueWithMomentum(ValueWithMomentum): | |
def __init__(this, alpha: float, beta: float = 0.9): | |
super().__init__(alpha, beta); | |
this.fixer = ValueWithBiasFixing(beta); | |
def calculatVDV(this, v_old, dv_incoming): | |
return this.fixer.update(super().calculatVDV(v_old, dv_incoming), None); | |
class ValueWithRMSProp(ValueUpdaters): | |
def __init__(this, alpha: float, beta: float = 0.999, epsilon=1e-8): | |
super().__init__(alpha, beta) | |
this.epsilon = epsilon; | |
this.sdv = 0; | |
def calculatSDV(this, v_old, dv_incoming): | |
return this.beta * this.sdv + (1 - this.beta) * (dv_incoming * dv_incoming); | |
def update(this, v_old, dv_incoming): | |
this.sdv = this.calculatSDV(v_old, dv_incoming); | |
v_new = v_old - this.alpha * dv_incoming / np.sqrt(this.sdv + this.epsilon) | |
return v_new; | |
class BiasFixedValueWithRMSProp(ValueWithRMSProp): | |
def __init__(this, alpha: float, beta: float = 0.999, epsilon=1e-8): | |
super().__init__(alpha, beta, epsilon); | |
this.fixer = ValueWithBiasFixing(beta); | |
def calculatSDV(this, v_old, dv_incoming): | |
return this.fixer.update(super().calculatSDV(v_old, dv_incoming), None); | |
class ValueWithAdam(ValueUpdaters): | |
def __init__(this, alpha: float, beta1: float = 0.9, beta2: float = 0.999, epsilon: float = 1e-8): | |
super().__init__(alpha, beta1); | |
this.epsilon = epsilon; | |
this.momentum = BiasFixedValueWithMomentum(alpha, beta1); | |
this.rmsprop = BiasFixedValueWithRMSProp(alpha, beta2, epsilon); | |
def update(this, v_old: ndarray, dv_incoming: ndarray) -> ndarray: | |
vdv = this.momentum.calculatVDV(v_old, dv_incoming); | |
sdv = this.rmsprop.calculatSDV(v_old, dv_incoming); | |
return v_old + this.alpha * vdv / np.sqrt(np.abs(sdv) + this.epsilon); | |
def set_alpha(this, new_alpha: float): | |
this.alpha = new_alpha; | |
this.momentum.alpha = new_alpha; | |
this.rmsprop.alpha = new_alpha; | |
class LayerModulator(Layer): | |
def __init__(this, layer: Layer): | |
super().__init__(layer.s_in, layer.s_out, layer.lr); | |
this.layer = layer; | |
def predict(this, a_in: ndarray) -> ndarray: | |
return this.process(this.layer.predict(a_in)); | |
def forward(this, a_in: ndarray) -> ndarray: | |
return this.process(this.layer.forward(a_in)); | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
return this.deprocess(this.layer.un_predict(da_out)); | |
def backward(this, da_out: ndarray) -> ndarray: | |
return this.deprocess(this.layer.backward(da_out)); | |
def get_parameters(this) -> list[ndarray]: | |
return this.layer.get_parameters(); | |
def set_parameters(this, parameters: list[ndarray]): | |
this.layer.set_parameters(parameters); | |
def set_lr(this, lr): | |
this.layer.set_lr(lr); | |
def process(this, result: ndarray) -> ndarray: | |
raise NotImplementedError(); | |
def deprocess(this, result: ndarray) -> ndarray: | |
raise NotImplementedError(); | |
class NormalizeLayer(LayerModulator): | |
def process(this, result: ndarray) -> ndarray: | |
this.norm = np.linalg.norm(result); | |
return result / this.norm; | |
def deprocess(this, result: ndarray) -> ndarray: | |
return result * this.norm; | |
class ClipLayer(LayerModulator): | |
def process(this, result: ndarray) -> ndarray: | |
return np.clip(result, -1.57, 1.57); | |
def deprocess(this, result: ndarray) -> ndarray: | |
return np.clip(result, -1.57, 1.57); | |
class LayerChain(Layer): | |
def __init__(this, layers: list[Layer], lr: float): | |
super().__init__(layers[0].s_in, layers[-1].s_out, lr); | |
this.layers = layers; | |
def get_parameters(this) -> list[ndarray]: | |
parameters = []; | |
for layer in this.layers: | |
parameters += layer.get_parameters(); | |
return parameters; | |
def set_parameters(this, parameters_list: list[ndarray]): | |
for layer in reversed(this.layers): | |
parameters = []; | |
for i in range(len(layer.get_parameters())): | |
parameters.insert(0, parameters_list.pop()); | |
layer.set_parameters(parameters); | |
assert len(parameters_list) == 0; | |
def set_lr(this, lr): | |
super().set_lr(lr) | |
for layer in this.layers: | |
layer.set_lr(lr); | |
def predict(this, a_in: ndarray) -> ndarray: | |
for layer in this.layers: | |
a_in = layer.predict(a_in); | |
return a_in; | |
def forward(this, a_in: ndarray) -> ndarray: | |
for layer in this.layers: | |
a_in = layer.forward(a_in); | |
return a_in; | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
for layer in reversed(this.layers): | |
da_out = layer.un_predict(da_out); | |
return da_out; | |
def backward(this, da_out: ndarray) -> ndarray: | |
for layer in reversed(this.layers): | |
da_out = layer.backward(da_out); | |
return da_out; | |
class MultiLayeredModel(LayerChain): | |
def __init__(this, sequence: Union[list[int]], lr: float, layer_supplier: Callable[[int, int, float], Layer]): | |
layers = []; | |
for i in range(1, len(sequence)): | |
layers.append(layer_supplier(sequence[i - 1], sequence[i], lr)); | |
layers.append(LeakyReluLayer(sequence[i - 1], sequence[i], 0.01)); | |
layers.pop() # remove last relu | |
layers.append(SigmoidLayer(sequence[-1], sequence[-1], lr)); | |
super().__init__(layers, lr); | |
class Perceptron(LayerChain): | |
def __init__(this, s_in: Dims, s_out: int, lr: float): | |
this.weights = MultiplicativeParametersLayer(s_in, s_out, lr); | |
this.bias = AdditiveParametersLayer(s_out, lr); | |
super().__init__([this.weights, this.bias], lr); | |
this.w = this.weights.w; | |
this.b = this.bias.b; | |
def forward(this, a_in: ndarray) -> ndarray: | |
a_out = super().forward(a_in); | |
this.a_in = this.weights.a_in; | |
return a_out; | |
class ADAMLayer(Perceptron): | |
def __init__(this, s_in: int, s_out: int, lr: float): | |
super().__init__(s_in, s_out, lr) | |
this.dw_fixer = ValueWithAdam(lr); | |
this.db_fixer = ValueWithAdam(lr); | |
def backward(this, da_out: ndarray) -> ndarray: | |
assert da_out.shape[1:] == this.s_out; | |
dw = (this.a_in.T @ da_out) / this.s_in; | |
this.w = this.dw_fixer.update(this.w, dw); | |
da_in = (da_out @ this.w.T) * this.a_in; | |
db = np.sum(da_out, axis=1, keepdims=True) / this.s_in; | |
this.b = this.db_fixer.update(this.b, db); | |
return da_in; | |
def set_lr(this, lr): | |
super().set_lr(lr); | |
this.dw_fixer.set_alpha(lr); | |
class MonoicBiModularComparatorModel(LayerChain): | |
def __init__(this, input: Layer, output_0: Layer, output_1: Layer, lr: float): | |
assert output_0.s_in == output_1.s_in == input.s_out and output_0.s_out == output_1.s_out; | |
super().__init__([input, output_0, output_1], lr); | |
this.input = input; | |
this.output_0 = output_0; | |
this.output_1 = output_1; | |
this.stop = 0.5; | |
this.desc = np.random.random((this.s_in, 1)); | |
this.connection = MultiplicativeParametersLayer(this.input.s_out, 1, lr); | |
this.connection.w = np.ones((this.connection.s_in, this.connection.s_out)) / this.connection.s_in; | |
this.activation = SigmoidLayer(1, 1, lr); | |
def predict(this, a_in: ndarray) -> ndarray: | |
a_in = this.input.forward(a_in); | |
desc = this.connection.forward(a_in); | |
desc = this.activation.forward(desc).reshape((-1,)); | |
stop = np.mean(desc); | |
a_out = np.zeros((a_in.shape[0], this.output_0.s_out)); | |
a_out[desc < stop] = this.output_0.forward(a_in[desc < stop]); | |
a_out[desc >= stop] = this.output_1.forward(a_in[desc >= stop]); | |
return a_out; | |
def forward(this, a_in: ndarray) -> ndarray: | |
a_in = this.input.forward(a_in); | |
this.desc = this.connection.forward(a_in); | |
this.desc = this.activation.forward(this.desc).reshape((-1,)); | |
this.stop = np.mean(this.desc); | |
a_out = np.zeros((a_in.shape[0], this.output_0.s_out)); | |
a_out[this.desc < this.stop] = this.output_0.forward(a_in[this.desc < this.stop]); | |
a_out[this.desc >= this.stop] = this.output_1.forward(a_in[this.desc >= this.stop]); | |
return a_out; | |
# TODO: Implement un_predict | |
def backward(this, da_out: ndarray) -> ndarray: | |
da_in = np.zeros((da_out.shape[0], this.output_0.s_in)); | |
da_in[this.desc < this.stop] = this.output_0.backward(da_out[this.desc < this.stop]); | |
da_in[this.desc >= this.stop] = this.output_1.backward(da_out[this.desc >= this.stop]); | |
return this.input.backward(da_in); | |
def create_monitoring_callback(this): | |
class MBMCMMonitoringCallback(Callback): | |
def training_start(this_child): | |
this_child.p = [0, 0, 0, 0]; | |
def consume_cost_features(this_child, cost_s: float, observer_cost_s: float, model: Layer): | |
avg = np.mean(this.desc); | |
jdx = int(avg * len(this_child.p)); | |
this_child.p[jdx] += 1; | |
def training_end(this_child, model: Layer): | |
print("Parameters studied", this_child.p); | |
return MBMCMMonitoringCallback(); | |
class LayerGrabber: | |
def __init__(this): | |
this.layer = None; | |
def grab(this, layer: Layer) -> Layer: | |
this.layer = layer; | |
return layer; | |
# Merge (itc, sum) the received a_in, and it's processed | |
# form using some previous layer. | |
class BackDependenceLayer(Layer): | |
def __init__(this, prev: Layer): | |
if isinstance(prev, Perceptron): | |
this.prev = Perceptron(prev.s_in, prev.s_out, prev.lr); | |
this.prev.weights = prev.weights; | |
this.prev.bias = prev.bias; | |
this.prev.w = prev.w; | |
this.prev.b = prev.b; | |
super().__init__(this.prev.s_in, this.prev.s_in, 0) | |
def predict(this, a_in: ndarray) -> ndarray: | |
return a_in + this.prev.predict(a_in); | |
def forward(this, a_in: ndarray) -> ndarray: | |
return a_in + this.prev.forward(a_in); | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
return da_out + this.prev.un_predict(da_out); | |
def backward(this, da_out: ndarray) -> ndarray: | |
return da_out + this.prev.backward(da_out); | |
class ResNetHelper: | |
def __init__(this): | |
this.layer = None; | |
def grab(this, layer: Layer) -> Layer: | |
this.layer = layer; | |
return layer; | |
class ResidualNetworkLayer(Layer): | |
def __init__(this, layer: Layer): | |
super().__init__(layer.s_out, layer.s_out, 0) | |
this.original_predict = layer.predict; | |
this.original_forward = layer.forward; | |
def hijacked_predict(a_in: ndarray) -> ndarray: | |
this.a_out = this.original_predict(a_in); | |
return this.a_out; | |
def hijacked_forward(a_in: ndarray) -> ndarray: | |
this.a_out = this.original_forward(a_in); | |
return this.a_out; | |
layer.predict = hijacked_predict; | |
layer.forward = hijacked_forward; | |
this.hijacked_layer = layer; | |
def predict(this, a_in: ndarray) -> ndarray: | |
# a_out = a_in + h_a_in | |
return a_in + this.a_out; | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
this.hijacked_layer.un_predict(da_out); | |
return da_out; | |
def backward(this, da_out: ndarray) -> ndarray: | |
# da_in = da_out * 1 | |
# dh_a_in = da_out * 1 | |
this.hijacked_layer.backward(da_out); | |
return da_out; | |
class Flatten(Layer): | |
def __init__(this, s_in: Dims, s_out: int = -1, lr: float = 0): | |
s_in = Layer.simplify_dims(s_in); | |
if s_out == -1: | |
s_out = np.prod(s_in); | |
super().__init__(s_in, s_out, lr) | |
def predict(this, a_in: ndarray) -> ndarray: | |
return a_in.reshape((a_in.shape[0], -1)); | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
return da_out.reshape(da_out.shape[0:1] + this.s_in); | |
class ActivationLayer(Layer): | |
def __init__(this, s_in: Dims, s_out: Dims = None, lr: float = 0): | |
if s_out is None: s_out = s_in; | |
super().__init__(s_in, s_out, lr) | |
def predict(this, a_in: ndarray) -> ndarray: | |
return this.activate(a_in); | |
def forward(this, a_in: ndarray) -> ndarray: | |
this.a_in = a_in; | |
return this.activate(a_in); | |
def activate(this, a_in: ndarray) -> ndarray: | |
raise NotImplementedError(); | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
# TODO: We dont have any a_in here to multiply -_- | |
return this.deactivate(da_out); | |
def backward(this, da_out: ndarray) -> ndarray: | |
return this.deactivate(this.a_in) * da_out; | |
def deactivate(this, a_in: ndarray) -> ndarray: | |
raise NotImplementedError(); | |
class SigmoidLayer(ActivationLayer): | |
def activate(this, a_in: ndarray) -> ndarray: | |
return 1 / (1 + np.exp(-a_in)); | |
def deactivate(this, da_out: ndarray) -> ndarray: | |
# return predict_prime(this.a_in) * da_out | |
v = np.exp(-da_out); | |
opv = 1 + v; | |
return v / (opv * opv); | |
class TanHLayer(ActivationLayer): | |
def activate(this, a_in: ndarray) -> ndarray: | |
return np.tanh(a_in); | |
def deactivate(this, da_out: ndarray) -> ndarray: | |
return 1 - np.power(np.tanh(da_out), 2); | |
class ReluLayer(ActivationLayer): | |
def activate(this, a_in: ndarray) -> ndarray: | |
return a_in * (a_in > 0); | |
def deactivate(this, da_out: ndarray) -> ndarray: | |
return da_out > 0; | |
class DivideLayer(ActivationLayer): | |
def __init__(this, s_in: Dims = -1, s_out: Dims = -1, lr: float = 0, factor=0.5): | |
super().__init__(s_in, s_out, lr) | |
this.factor = 0.5; | |
def activate(this, a_in: ndarray) -> ndarray: | |
a_in[a_in < this.factor] = 0; | |
a_in[a_in >= this.factor] = 1; | |
return a_in; | |
def deactivate(this, da_out: ndarray) -> ndarray: | |
return np.zeros(da_out.shape); | |
class LeakyReluLayer(ActivationLayer): | |
def activate(this, a_in: ndarray) -> ndarray: | |
return a_in * (a_in > 0) + this.lr * a_in * (a_in < 0); | |
def deactivate(this, da_out: ndarray) -> ndarray: | |
return (da_out > 0) + this.lr * (da_out < 0); | |
class RecurrentLayer(Layer): | |
# Expects an input of shape (m, n, o) where one sample of (n, o) represents an input of length n, one_hot_encoded | |
# with max size o to a matrix (n, o). | |
# For example, if the data is letters in words, o is 26. For a word like, Happy, n is 5. | |
# In my implementation, input_height refers to o | |
ActivationSupplier = Callable[[int], 'ActivationLayer']; | |
def __init__(this, s_in: Dims, lr: float, output_height: int, hidden_height: int, | |
hidden_activation: ActivationSupplier = None, output_activation: ActivationSupplier = None, | |
clip_value: tuple[float, float] = (0, 0)): | |
s_in = Layer.simplify_dims(s_in); | |
assert len(s_in) == 2, s_in; | |
# s_in = (-1, s_in[1]); | |
this.input_height = s_in[1]; | |
this.hidden_height = hidden_height; | |
this.output_height = output_height; | |
factor = np.sqrt(hidden_height); | |
this.way = np.random.standard_normal((hidden_height, output_height)) / factor; | |
this.by = np.zeros((1, output_height)); | |
this.wxa = np.random.standard_normal((this.input_height, hidden_height)) / factor; | |
this.waa = np.random.standard_normal((hidden_height, hidden_height)) / factor; | |
this.ba = np.zeros((1, hidden_height)); | |
super().__init__(s_in, (s_in[0], output_height), lr); | |
this.hidden_activation_supplier = hidden_activation if hidden_activation is not None else SigmoidLayer; | |
this.output_activation_supplier = output_activation if output_activation is not None else SigmoidLayer; | |
this.hidden_activations = [this.hidden_activation_supplier(s_in[1])]; | |
this.output_activations = [this.output_activation_supplier(hidden_height)]; | |
this.list_of_hidden_activations = []; | |
this.clip_value = clip_value; | |
def predict(this, a_in: ndarray) -> ndarray: | |
raise NotImplementedError() | |
def forward(this, a_in: ndarray) -> ndarray: | |
assert a_in.ndim == 3, a_in.shape; | |
m, n, o = a_in.shape; | |
this.a_in = a_in; | |
hidden = np.zeros((m, this.hidden_height)); | |
this.hidden_collection = np.zeros((n + 1, m, this.hidden_height)); | |
# this.hidden_collection[0] = hidden; | |
this.output = np.zeros((m, n, this.output_height)); | |
while len(this.hidden_activations) < n: | |
this.hidden_activations.append(this.hidden_activation_supplier(this.input_height)); | |
this.output_activations.append(this.output_activation_supplier(this.hidden_height)); | |
for i in range(n): | |
g1 = a_in[:, i] @ this.wxa + hidden @ this.waa + this.ba; | |
hidden = this.hidden_activations[i].forward(g1); | |
this.hidden_collection[i + 1] = hidden; | |
g2 = hidden @ this.way + this.by; | |
output = this.output_activations[i].forward(g2); | |
this.output[:, i] = output; | |
return this.output; | |
def un_predict(this, da_out: ndarray) -> ndarray: | |
raise NotImplementedError() | |
def backward(this, da_out: ndarray) -> ndarray: | |
m, n, p = this.a_in.shape; | |
dhidden_next = np.zeros_like(this.hidden_collection[0]); | |
dwxa = np.zeros_like(this.wxa); | |
dwaa = np.zeros_like(this.waa); | |
dba = np.zeros_like(this.ba); | |
dway = np.zeros_like(this.way); | |
dby = np.zeros_like(this.by); | |
for i in reversed(range(n)): | |
dg2 = this.output_activations[i].backward(da_out[:, i]); | |
hidden = this.hidden_collection[i + 1]; | |
hidden_prev = this.hidden_collection[i]; | |
dway += hidden.T @ dg2 / m; | |
dby += np.mean(dg2, axis=0, keepdims=True); | |
dhidden = dg2 @ this.way.T + dhidden_next; | |
dg1 = this.hidden_activations[i].backward(dhidden); | |
dba += np.mean(dg1, axis=0, keepdims=True); | |
dwxa += this.a_in[:, i].T @ dg1 / m; | |
dwaa += hidden_prev.T @ dg1 / m; | |
if this.clip_value != (0,0): | |
for gradient in [dwxa, dwaa, dway, dba, dby]: | |
np.clip(gradient, -this.clip_value[0], this.clip_value[1], out=gradient); | |
this.wxa = this.wxa - this.lr * dwxa; | |
this.waa = this.waa - this.lr * dwaa; | |
this.ba = this.ba - this.lr * dba; | |
this.way = this.way - this.lr * dway; | |
this.by = this.by - this.lr * dby; | |
# return | |
def get_parameters(this) -> list[ndarray]: | |
return [this.wxa, this.waa, this.ba, this.way, this.by]; | |
def set_parameters(this, parameters: list[ndarray]): | |
this.wxa, this.waa, this.ba, this.way, this.by = \ | |
Layer.set_parameters_helper(parameters, this.wxa, this.waa, this.ba, this.way, this.by); | |
def are_input_dims_compliant(this, x: ndarray) -> bool: | |
return x.shape[2:] == this.s_in[1:]; | |
def are_output_dims_compliant(this, y: ndarray) -> bool: | |
return y.shape[2:] == this.s_out[1:]; | |
class Loss: | |
def __init__(this): | |
pass; | |
# L = loss(y, y`) | |
def loss(this, y: ndarray, yp: ndarray) -> ndarray: | |
raise NotImplementedError(); | |
# dL/dy = dloss() | |
def dloss(this, y: ndarray, yp: ndarray) -> ndarray: | |
raise NotImplementedError(); | |
def cost(this, y: ndarray, yp: ndarray) -> Tuple[float, ndarray, ndarray]: | |
l = this.loss(y, yp); | |
c = np.mean(l, axis=0, keepdims=True); | |
return (np.mean(c, keepdims=True)[0][0], c, l); | |
def dcost(this, y: ndarray, yp: ndarray) -> Tuple[float, ndarray, ndarray]: | |
l = this.dloss(y, yp); | |
c = np.mean(l, axis=0, keepdims=True); | |
return (np.mean(c, keepdims=True)[0][0], c, l); | |
# Mean Absolute Error | |
class MAELoss(Loss): | |
def loss(this, y: ndarray, yp: ndarray) -> ndarray: | |
return np.abs(y - yp); | |
def dloss(this, y: ndarray, yp: ndarray) -> ndarray: | |
return - np.sign(y - yp); | |
# Mean Squared Error | |
class MSELoss(Loss): | |
def loss(this, y: ndarray, yp: ndarray) -> ndarray: | |
d = y - yp; | |
return d * d; | |
def dloss(this, y: ndarray, yp: ndarray) -> ndarray: | |
return - 2 * (y - yp); | |
# Binary Cross Entropy | |
class BCELoss(Loss): | |
def loss(this, y: ndarray, yp: ndarray) -> ndarray: | |
return - (y * np.log(yp) + (1 - y) * np.log(1 - yp)); | |
def dloss(this, y: ndarray, yp: ndarray) -> ndarray: | |
return - (y / yp - (1 - y) / (1 - yp)); | |
# Mean Squared Logarithmic Error | |
class MSLELoss(Loss): | |
def loss(this, y: ndarray, yp: ndarray) -> ndarray: | |
return np.power(np.log(y + 1) - np.log(yp + 1), 2); | |
def dloss(this, y: ndarray, yp: ndarray) -> ndarray: | |
return -2 * (np.log(1 + y) - np.log(1 + yp)) / (1 + yp); | |
# 👀 | |
class SimpleLoss(Loss): | |
def loss(this, y: ndarray, yp: ndarray) -> ndarray: | |
return (y != (yp > 0.5)).astype(float); | |
def dloss(this, y: ndarray, yp: ndarray) -> ndarray: | |
return np.ones(y.shape); | |
# Entropy | |
class EntropyLoss(Loss): | |
def __init__(this): | |
super().__init__() | |
this.helper = BCELoss(); | |
def loss(this, y: ndarray, yp: ndarray) -> ndarray: | |
indices = yp[yp <= 0 or yp >= 1]; | |
yp[indices] = 0; | |
yp[~indices] = this.helper.loss(y, yp); | |
return yp; | |
def dloss(this, y: ndarray, yp: ndarray) -> ndarray: | |
indices = yp[yp <= 0 or yp >= 1]; | |
yp[indices] = 0; | |
yp[~indices] = this.helper.dloss(y, yp); | |
return yp; | |
# @jit(nopython=True) | |
class ConfusionMatrix: | |
def __init__(this, y: ndarray, yp: ndarray): | |
assert y.shape == yp.shape and len(y.shape) == 2; | |
this.m, this.c = y.shape[0], y.shape[1]; | |
y, yp = y.argmax(axis=1), yp.argmax(axis=1); | |
this.cm = np.zeros((this.c, this.c), int); | |
for i in range(this.m): | |
this.cm[y[i], yp[i]] += 1; | |
this.pred_sums = np.sum(this.cm, axis=1); | |
this.actl_sums = np.sum(this.cm, axis=0); | |
this.total = this.m; | |
this.true_values = np.diagonal(this.cm); | |
this.trues = np.sum(this.true_values); | |
# this.fn = np.sum(np.triu(this.cm, 1)); | |
# this.fp = np.sum(np.tril(this.cm, -1)); | |
print("Accuracy =", this.trues / this.total * 100); | |
def print_matrix(this): | |
row_width = 2 + len("%d" % this.total); | |
print("Predicted ", end=""); | |
print(*["%%-%dd" % row_width % x for x in range(this.c)]); | |
print("Actual"); | |
for i in range(this.c): | |
print(" %2d " % i, *["%%-%dd" % row_width % x for x in this.cm[i]], " %5d" % this.pred_sums[i]); | |
print(" ", *["%%-%dd" % row_width % x for x in this.actl_sums], " %5d" % this.total); | |
class Callback: | |
def __init__(this): | |
pass; | |
def training_start(this): | |
pass; | |
def consume_cost_features(this, cost_s: float, observer_cost_s: float, model: Layer): | |
pass; | |
def training_end(this, model: Layer): | |
pass; | |
def is_training_breakable(this, cost_s: float, observer_cost_s: float, model: Layer) -> bool: | |
return False; | |
def reset(this): | |
pass; | |
class CostRecorder(Callback): | |
def __init__(this): | |
super().__init__(); | |
this.costs = []; | |
this.observer_costs = []; | |
this.current_costs = []; | |
this.current_observer_costs = []; | |
def training_start(this): | |
this.current_costs = []; | |
this.current_observer_costs = []; | |
def consume_cost_features(this, cost_s: float, observer_cost_s, model: Layer): | |
this.costs.append(cost_s); | |
this.observer_costs.append(observer_cost_s); | |
this.current_costs.append(cost_s); | |
this.current_observer_costs.append(observer_cost_s); | |
def reset(this): | |
this.costs.clear(); | |
this.observer_costs.clear(); | |
this.current_costs.clear(); | |
this.current_observer_costs.clear(); | |
class PlotScalarCost(CostRecorder): | |
def __init__(this, print_stats_to_terminal: bool = True, register_training_end_callback:bool =True): | |
super().__init__(); | |
this.plotLines = []; | |
this.print_stats_to_terminal = print_stats_to_terminal; | |
this.register_training_end_callback = register_training_end_callback; | |
def reset(this): | |
super().reset(); | |
this.plotLines = []; | |
def training_end(this, model: Layer): | |
if this.register_training_end_callback: | |
this.__training_end__(model); | |
def __training_end__(this, model: Layer): | |
if len(this.current_costs) > 0: | |
if this.print_stats_to_terminal: | |
print("Costs rec'ed", len(this.current_costs)); | |
print("Minimum Cost", min(this.current_costs)); | |
print("Last Cost ", this.current_costs[-1]); | |
plt.subplot(2, 1, 1); | |
plt.title("Current Cost") | |
plt.plot(this.costs); | |
plt.subplot(2, 1, 2); | |
plt.title("Observing Cost") | |
plt.plot(this.observer_costs); | |
plt.show(); | |
else: | |
print("No Cost Data To Plot...") | |
class PlotTestDataToo(CostRecorder): | |
def __init__(this, x_test: ndarray, y_test: ndarray, loss: Loss, observer_loss: Loss = None): | |
super().__init__(); | |
this.testRecorder = CostRecorder(); | |
this.x_test = x_test; | |
this.y_test = y_test; | |
this.loss = loss; | |
this.observer_loss = observer_loss; | |
this.plotLines = []; | |
def training_start(this): | |
super().training_start(); | |
this.testRecorder.training_start(); | |
def consume_cost_features(this, cost_s: float, observer_cost_s, model: Layer): | |
super().consume_cost_features(cost_s, observer_cost_s, model); | |
cost_s = this.loss.cost(this.y_test, model.predict(this.x_test))[0]; | |
if this.observer_loss is not None: | |
observer_cost_s = this.observer_loss.cost(this.y_test, model.predict(this.x_test))[0]; | |
else: | |
observer_cost_s = cost_s; | |
this.testRecorder.consume_cost_features(cost_s, observer_cost_s, model); | |
def reset(this): | |
super().reset(); | |
this.testRecorder.reset(); | |
this.plotLines = []; | |
def training_end(this, model: Layer): | |
if len(this.current_costs) > 0: | |
print("Costs rec'ed", len(this.current_costs)); | |
print("Minimum Cost", min(this.current_costs)); | |
print("Last Cost ", this.current_costs[-1]); | |
# figsize = (8, 11); | |
plt.subplot(3, 1, 1); | |
plt.title("Current Cost") | |
plt.plot(this.costs, color='blue'); | |
for line in this.plotLines: | |
plt.axvline(line, color='r'); | |
plt.subplot(3, 1, 2); | |
plt.title("Current Cost") | |
plt.plot(this.costs, color='blue'); | |
plt.plot(this.testRecorder.costs, color='orange'); | |
for line in this.plotLines: | |
plt.axvline(line, color='r'); | |
plt.subplot(3, 1, 3); | |
plt.title("Observing Cost") | |
plt.plot(this.observer_costs, color='blue'); | |
plt.plot(this.testRecorder.observer_costs, color='orange'); | |
for line in this.plotLines: | |
plt.axvline(line, color='r'); | |
plt.show(); | |
else: | |
print("No Cost Data To Plot...") | |
class AutomaticDecayingLearningRate(Callback): | |
def __init__(this, costRecorder: CostRecorder, fractur: float): | |
super().__init__(); | |
this.costRecorder = costRecorder; | |
assert fractur < 1; | |
this.fractur = fractur; | |
def reset(this): | |
super().reset() | |
this.costRecorder.reset(); | |
def consume_cost_features(this, cost_s: float, observer_cost_s, model: Layer): | |
if len(this.costRecorder.costs) < 2: | |
return; | |
last_cost = this.costRecorder.costs[-2]; | |
if last_cost < cost_s and abs(model.lr) > 1e-30: | |
model.set_lr(model.lr * this.fractur); | |
if isinstance(this.costRecorder, PlotScalarCost) or isinstance(this.costRecorder, PlotTestDataToo): | |
this.costRecorder.plotLines.append(len(this.costRecorder.costs) - 1); | |
# plt.axvline(len(this.costRecorder.costs), color='r'); | |
class PreventIncreaseInCost(Callback): | |
def __init__(this, costRecorder: CostRecorder): | |
super().__init__(); | |
this.costRecorder = costRecorder; | |
def is_training_breakable(this, cost_s: float, observer_cost_s, model: Layer) -> bool: | |
last_cost = this.costRecorder.costs[-1]; | |
if last_cost == cost_s and len(this.costRecorder.costs) > 2: | |
last_cost = this.costRecorder.costs[-2]; | |
return cost_s > last_cost; | |
def one_hot_encode(inp: ndarray, size: int = -1) -> ndarray: | |
assert len(inp.shape) <= 2; | |
inp = inp.astype(int); | |
if size == -1: size = np.max(inp) + 1; | |
res = np.zeros((inp.shape[0], size)); | |
for i in range(inp.shape[0]): | |
res[i, inp[i]] = 1; | |
return res; | |
def one_hot_encode_list(inp: list[int], size: int = -1) -> list[list[int]]: | |
if size == -1: | |
size = np.max(inp) + 1; | |
res = [[0] * size for _ in range(len(inp))]; | |
for i, v in enumerate(inp): | |
res[i][v] = 1; | |
return res; | |
# NOT INTENDED TO ME AN EXACT de_one_hot, just a bodge | |
def decode(c: int, i: int) -> ndarray: | |
a = np.zeros((1, c)); | |
a[0][i] = 1; | |
return a; | |
def file_name(name: str) -> str: | |
try: | |
return name[name.rindex('\\') + 1:name.rindex('.')]; | |
except: | |
return "debug_session"; | |
def backprop_random_label(model: Layer): | |
label = np.zeros((np.product(model.s_out),)); | |
i = np.random.randint(label.shape[0]); | |
print('using', i); | |
label[i] = 1; | |
label = label.reshape((1,) + model.s_out); | |
sample = model.un_predict(label)[0]; | |
# if sample.ndim != 2: | |
# print("Terra-transforming", sample.shape, end=" "); | |
# sample = sample.reshape((sample.shape[0], -1)); | |
# print("to", sample.shape); | |
plt.imshow(sample); | |
plt.title(i); | |
plt.show(); | |
class Trainer: | |
def __init__(this, callbacks: list[Callback] = None): | |
if callbacks == None: | |
callbacks = []; | |
this.callbacks = callbacks; | |
def pick_a_nice_random_model(this, create_model: Callable[[], Layer], | |
loss: Loss, | |
bayes_error: float = 1, | |
max_iterations: float = 100_000) -> Layer: | |
iterations = 0; | |
model_with_least_cost_so_far = model = create_model(); | |
least_cost_so_far = cost = loss.cost(y, model.forward(x))[0]; | |
while True: | |
if cost < least_cost_so_far: | |
model_with_least_cost_so_far = model; | |
iterations += 1; | |
if iterations % 100000 == 0: | |
print(iterations, "models tested so far."); | |
if cost > bayes_error and iterations < max_iterations: | |
model = create_model(); | |
cost = loss.cost(y, model.forward(x))[0]; | |
continue; | |
else: | |
break; | |
print(iterations, "models tested so far."); | |
return model_with_least_cost_so_far; | |
def pick_a_nice_random_model_2(this, create_model: Callable[[], Layer], x: ndarray, y: ndarray, | |
loss: Loss, observer_loss: Loss = None, | |
bayes_error: float = 1, | |
max_search_iterations: int = 1_000, | |
max_train_iterations: int = 1_000) -> Layer: | |
models = []; | |
for i in range(100): | |
model = this.pick_a_nice_random_model(create_model, loss, bayes_error, max_search_iterations); | |
cost = loss.cost(y, model.forward(x))[0]; | |
models.append([cost, model]); | |
def comparator(i1, i2): | |
i1, i2 = i1[0], i2[0]; | |
return i1 - i2; | |
# models.sort(key=comparator); | |
callbacks = this.callbacks; | |
this.callbacks = []; | |
for item in models: | |
this.overfit(item[1], x, y, loss, observer_loss, bayes_error, max_train_iterations); | |
cost = loss.cost(y, item[1].forward(x))[0]; | |
if cost > item[0]: | |
item[0] = -1; | |
continue; | |
else: | |
item[0] = cost; | |
print("Final Cost", item[0]); | |
this.callbacks = callbacks; | |
models = [item for item in models if item[0] != -1]; | |
if len(models) <= 0: | |
print("lol"); | |
exit(0); | |
return min(models, key=lambda i: i[0])[1]; | |
def overfit(this, model: Layer, x: ndarray, y: ndarray, | |
loss: Loss, observer_loss: Loss = None, | |
bayes_error: float = 1, max_iterations: int = 10_000, | |
batch_offset=0, | |
print_stats_to_terminal:bool=False, notify_with_beep: bool = True | |
): | |
if model.are_input_dims_compliant(x): | |
x = x.reshape((1,) + x.shape); | |
if model.are_output_dims_compliant(y): | |
y = y.reshape((1,) + y.shape); | |
assert model.are_input_dims_compliant(x[0]); | |
assert model.are_output_dims_compliant(y[0]); | |
if observer_loss == None: | |
observer_loss = loss; | |
total_batches = x.shape[0]; | |
# batch_index = batch_offset % total_batches; | |
batch_index = 0; | |
iterations = batch_offset; | |
max_iterations += batch_offset; | |
epoch = 0; | |
for callback in this.callbacks: | |
callback.training_start(); | |
yd = model.forward(x[batch_index]); | |
(cost_s, cost_v, loss_v) = loss.cost(y[batch_index], yd); | |
(observer_cost_s, _, _) = observer_loss.cost(y[batch_index], yd); | |
if print_stats_to_terminal: | |
print("Initial Cost", cost_s) | |
for callback in this.callbacks: | |
callback.consume_cost_features(cost_s, observer_cost_s, model); | |
if callback.is_training_breakable(cost_s, observer_cost_s, model): | |
print("Training stopped due to callback!") | |
break; | |
while cost_s > bayes_error and iterations < max_iterations: | |
model.backward(loss.dcost(y[batch_index], yd)[2]); | |
iterations += 1; | |
batch_index = iterations % total_batches; | |
if batch_index == 0: epoch += 1; | |
yd = model.forward(x[batch_index]); | |
(cost_s, cost_v, loss_v) = loss.cost(y[batch_index], yd); | |
(observer_cost_s, _, _) = observer_loss.cost(y[batch_index], yd); | |
if np.max([np.max(np.abs(parameter)) for parameter in model.get_parameters()]) > 1e6: | |
print("Heading towards explosion 👀..."); | |
break; | |
for callback in this.callbacks: | |
callback.consume_cost_features(cost_s, observer_cost_s, model); | |
if callback.is_training_breakable(cost_s, observer_cost_s, model): | |
print("Training stopped due to callback!") | |
break; | |
else: | |
continue; | |
break; | |
for callback in this.callbacks: | |
callback.training_end(model); | |
if notify_with_beep: | |
beepy.beep("coin"); | |
def start_observatory(this, model: Layer, x: ndarray, y: ndarray, | |
loss: Loss, observer_loss: Loss, create_model, | |
bayes_error: float = 1, max_iterations: int = 10_000, | |
custom_commands: dict[str, Callable[[Layer], None]] = None, | |
file='parameters' | |
) -> Layer: | |
# Configurables | |
# lr - model intrinsic | |
iter_per_loop = max_iterations; | |
batch_size = 64; | |
file = '%s.npz' % file; | |
# Machine States | |
iterations = 0; | |
x_batches = get_windows_for_mini_batch(x, size_of_batches=batch_size); | |
y_batches = get_windows_for_mini_batch(y, size_of_batches=batch_size); | |
parameters_backup = model.get_parameters(); | |
while True: | |
try: | |
command = input("Observatory: Choose an action: "); | |
if command == 'h': | |
print("Valid commands so far are h, l, i, b, p, pn, q, qn, r, n, c, t, x"); | |
elif command == 'l': | |
print("Set learning rate FACTOR (current lr is", model.lr, "): "); | |
new_lr = model.lr * float(input(">>> ")); | |
model.set_lr(new_lr); | |
print("New lr is", new_lr); | |
elif command == 'i': | |
print("Set iterations per loop (currently", iter_per_loop, "): "); | |
iter_per_loop = int(input(">>> ")); | |
elif command == 'b': | |
print("Set mini batch size (currently", batch_size, "): "); | |
batch_size = int(input(">>> ")); | |
if batch_size == -1: | |
batch_size = x.shape[0]; | |
x_batches = get_windows_for_mini_batch(x, size_of_batches=batch_size); | |
y_batches = get_windows_for_mini_batch(y, size_of_batches=batch_size); | |
elif command == 'p': | |
print("Saving Parameters to", file, "file"); | |
np.savez(file, *model.get_parameters()); | |
elif command == 'pn': | |
name = "%s.npz" % input("Please enter a name: "); | |
print("Saving Parameters to %s file." % name); | |
np.savez(name, *model.get_parameters()); | |
elif command == 'q': | |
print("Loading Parameters from", file, "file"); | |
parameters = np.load(file); | |
model.set_parameters([parameters[key] for key in parameters]); | |
elif command == 'qn': | |
name = "%s.npz" % input("Please enter a name: "); | |
print("Loading Parameters from %s file." % name); | |
parameters = np.load(name); | |
model.set_parameters([parameters[key] for key in parameters]); | |
elif command == 'r': | |
print("Resetting to last learnt parameters."); | |
model.set_parameters(parameters_backup); | |
# [cb.reset() for cb in this.callbacks] | |
elif command == 'n': | |
model = create_model(); | |
# [cb.reset() for cb in this.callbacks] | |
elif command == 'c': | |
[cb.reset() for cb in this.callbacks] | |
elif command == 't': | |
parameters_backup = model.get_parameters(); | |
this.overfit(model, x_batches, y_batches, loss, observer_loss, bayes_error=bayes_error, | |
max_iterations=iter_per_loop, batch_offset=iterations); | |
iterations += iter_per_loop; | |
elif command == 'x': | |
break; | |
elif command in custom_commands: | |
custom_commands[command](model); | |
except Exception as e: | |
print("You probably made a typo 👀..."); | |
print(traceback.print_exc()); | |
continue; | |
return model; | |
def normalize_data(v: ndarray) -> ndarray: | |
v -= np.mean(v, axis=0, keepdims=True); | |
s = np.std(v, axis=0, keepdims=True); | |
s[s <= 0.01] = 1; | |
v /= s; | |
return v; | |
if __name__ == '__main__': | |
x = np.array([ | |
[0, 0, 0, 0], | |
[0, 0, 0, 1], | |
[0, 0, 1, 0], | |
[0, 0, 1, 1], | |
[0, 1, 0, 0], | |
[0, 1, 0, 1], | |
[0, 1, 1, 0], | |
[0, 1, 1, 1], | |
[1, 0, 0, 0], | |
[1, 0, 0, 1] | |
], dtype=float); | |
y = np.array([ | |
[1, 1, 1, 0, 1, 1, 0], | |
[0, 0, 1, 0, 0, 1, 0], | |
[1, 0, 1, 1, 1, 1, 0], | |
[1, 0, 1, 1, 0, 1, 1], | |
[0, 1, 1, 1, 0, 1, 0], | |
[1, 1, 0, 1, 0, 1, 1], | |
[1, 1, 0, 1, 1, 1, 1], | |
[1, 1, 0, 0, 1, 0, 0], | |
[1, 1, 1, 1, 1, 1, 1], | |
[1, 1, 1, 1, 0, 1, 0] | |
], dtype=float); | |
# x = np.array([[1, 1], [1, 0], [0, 1], [0, 0]], dtype=float); | |
# y = np.array([[1, 1], [0, 1], [0, 1], [0, 0]]); | |
x = normalize_data(x); | |
# loss = MAELoss(); | |
# loss = MSELoss(); | |
# loss = BCELoss(); | |
loss = MSLELoss(); | |
observer_loss = SimpleLoss(); | |
def create_model(): | |
model = MultiLayeredModel \ | |
([x.shape[1], 4, y.shape[1]], | |
0.00001, | |
lambda s_in, s_out, lr: (Perceptron(s_in, s_out, lr))); | |
return model; | |
costRecorder = PlotScalarCost(); | |
# costRecorder = PlotTestDataToo(np.array([[1, 1], [0, 1]]), np.array([[1, 1], [0, 1]]), loss, observer_loss); | |
trainer = Trainer([ | |
costRecorder | |
# , AutomaticDecayingLearningRate(costRecorder, 0.5) | |
# , PreventIncreaseInCost(callback) | |
]); | |
model = create_model(); | |
# model = trainer.pick_a_nice_random_model(create_model, loss, bayes_error=0.3); | |
# model = trainer.pick_a_nice_random_model_2(create_model, x, y, loss, observer_loss, bayes_error=0.3); | |
def my_custom_action(m: Layer): | |
print(m.lr); | |
yp = np.where(m.forward(x) < 0.5, 0, 1); | |
print(yp); | |
yp = np.where(yp != y, 0, 1); | |
print(yp); | |
print(np.sum(1 - yp)); | |
# trainer.overfit(model, x, y, loss, bayes_error=0, max_iterations=5000); | |
model = trainer.start_observatory(model, x, y, loss, observer_loss, create_model, bayes_error=0, | |
custom_commands={ | |
'cm': lambda m: ConfusionMatrix(y, m.forward(x)).print_matrix(), | |
'mca': my_custom_action | |
} | |
); | |
my_custom_action(model); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment