-
-
Save pnmartinez/0c456543d0475be0718c1b46ee77bf25 to your computer and use it in GitHub Desktop.
N-Beats allowing the pick of activation functions. SELU decision is clever and applies AlphaDropout and weight init.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Implementation of ``nn.Modules`` for N-Beats model. | |
*This pull request adds `activation_fn` as parameter. | |
""" | |
from typing import Tuple | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
def linear(input_size, output_size, bias=True, dropout: int = None, use_selu: bool = False): | |
lin = nn.Linear(input_size, output_size, bias=bias) | |
if dropout is not None: | |
# "Alpha Dropout is a type of Dropout that maintains the self-normalizing property. Alpha Dropout goes | |
# hand-in-hand with SELU activation function, which ensures that the outputs have zero mean and unit standard deviation." | |
# Source: https://pytorch.org/docs/stable/generated/torch.nn.AlphaDropout.html#alphadropout | |
if use_selu: | |
return nn.Sequential(nn.AlphaDropout(dropout), lin) | |
else: | |
return nn.Sequential(nn.Dropout(dropout), lin) | |
else: | |
return lin | |
def linspace(backcast_length: int, forecast_length: int, centered: bool = False) -> Tuple[np.ndarray, np.ndarray]: | |
if centered: | |
norm = max(backcast_length, forecast_length) | |
start = -backcast_length | |
stop = forecast_length - 1 | |
else: | |
norm = backcast_length + forecast_length | |
start = 0 | |
stop = backcast_length + forecast_length - 1 | |
lin_space = np.linspace(start / norm, stop / norm, backcast_length + forecast_length, dtype=np.float32) | |
b_ls = lin_space[:backcast_length] | |
f_ls = lin_space[backcast_length:] | |
return b_ls, f_ls | |
class NBEATSBlock(nn.Module): | |
def __init__( | |
self, | |
units, | |
thetas_dim, | |
num_block_layers=4, | |
backcast_length=10, | |
forecast_length=5, | |
share_thetas=False, | |
dropout=0.1, | |
############################ | |
activation_fn: str='relu' ## Added this | |
############################ | |
): | |
############################################################################################################### | |
print(activation_fn) | |
# Parse activation function | |
if activation_fn in ['selu', 'SELU', 'leaky_relu', 'lrelu', 'LRELU']: | |
if activation_fn in ['selu', 'SELU']: | |
activation_fn = nn.SELU() | |
use_selu = True | |
else: | |
activation_fn = nn.LeakyReLU() | |
use_selu = False | |
else: | |
activation_fn = nn.ReLU() | |
use_selu = False | |
print(activation_fn) | |
# We add this tiny attribute to our SELU-ready, | |
# properly-initialized Block | |
self.nbeats_ready_for_selu = use_selu | |
self.selu_version = 0.4 | |
# Better implementation: choosing through a parameter | |
# Based on: https://github.com/bioinf-jku/SNNs/blob/master/Pytorch/SelfNormalizingNetworks_CNN_CIFAR10.ipynb | |
# dropout_layer = nn.AlphaDropout(p=dropout) if use_selu else nn.Dropout(p=dropout) | |
############################################################################################################# | |
super().__init__() | |
self.units = units | |
self.thetas_dim = thetas_dim | |
self.backcast_length = backcast_length | |
self.forecast_length = forecast_length | |
self.share_thetas = share_thetas | |
fc_stack = [ | |
nn.Linear(backcast_length, units), | |
activation_fn | |
# nn.SELU() | |
# nn.ReLU(), | |
] | |
for _ in range(num_block_layers - 1): | |
fc_stack.extend([linear(units, units, dropout=dropout, use_selu = use_selu), activation_fn]) #nn.SELU()]) #nn.ReLU()]) | |
self.fc = nn.Sequential(*fc_stack) | |
if share_thetas: | |
self.theta_f_fc = self.theta_b_fc = nn.Linear(units, thetas_dim, bias=False) | |
else: | |
self.theta_b_fc = nn.Linear(units, thetas_dim, bias=False) | |
self.theta_f_fc = nn.Linear(units, thetas_dim, bias=False) | |
########################################################################## | |
if use_selu: | |
# Self Normalizing networks must normally initialized | |
self.init_weights() | |
else: | |
pass | |
def init_weights(self): | |
""" | |
Weight initialization to achieve Self-Normalizing Networks (SNNs), | |
i. e. the main feature obtained by using SELU. | |
'When using kaiming_normal or kaiming_normal_ for initialisation, | |
nonlinearity='linear' should be used instead of nonlinearity='selu' | |
in order to get Self-Normalizing Neural Networks.' | |
Sources: | |
- https://pytorch.org/docs/master/generated/torch.nn.SELU.html#selu | |
- https://pytorch.org/docs/master/nn.init.html#torch-nn-init | |
- https://stackoverflow.com/a/49433937 | |
""" | |
# print("initialize for SELU") | |
def init_for_selu(m): | |
if type(m) == torch.nn.Linear: | |
# print("initialized") | |
torch.nn.init.kaiming_normal_(m.weight, mode = 'fan_in', nonlinearity = 'linear') | |
# Biases to zero | |
# print(m) | |
# print(dir(m)) | |
# print(m.bias) | |
if m.bias != None: | |
# print("biases to 0!\n") | |
nn.init.constant_(m.bias, 0) | |
self.apply(init_for_selu) | |
########################################################################## | |
def forward(self, x): | |
return self.fc(x) | |
class NBEATSSeasonalBlock(NBEATSBlock): | |
def __init__( | |
self, | |
units, | |
thetas_dim=None, | |
num_block_layers=4, | |
backcast_length=10, | |
forecast_length=5, | |
nb_harmonics=None, | |
min_period=1, | |
dropout=0.1, | |
############################## | |
activation_fn: str = 'relu' | |
############################### | |
): | |
if nb_harmonics: | |
thetas_dim = nb_harmonics | |
else: | |
thetas_dim = forecast_length | |
self.min_period = min_period | |
super().__init__( | |
units=units, | |
thetas_dim=thetas_dim, | |
num_block_layers=num_block_layers, | |
backcast_length=backcast_length, | |
forecast_length=forecast_length, | |
share_thetas=True, | |
dropout=dropout, | |
activation_fn = activation_fn | |
) | |
backcast_linspace, forecast_linspace = linspace(backcast_length, forecast_length, centered=False) | |
p1, p2 = (thetas_dim // 2, thetas_dim // 2) if thetas_dim % 2 == 0 else (thetas_dim // 2, thetas_dim // 2 + 1) | |
s1_b = torch.tensor( | |
[np.cos(2 * np.pi * i * backcast_linspace) for i in self.get_frequencies(p1)], dtype=torch.float32 | |
) # H/2-1 | |
s2_b = torch.tensor( | |
[np.sin(2 * np.pi * i * backcast_linspace) for i in self.get_frequencies(p2)], dtype=torch.float32 | |
) | |
self.register_buffer("S_backcast", torch.cat([s1_b, s2_b])) | |
s1_f = torch.tensor( | |
[np.cos(2 * np.pi * i * forecast_linspace) for i in self.get_frequencies(p1)], dtype=torch.float32 | |
) # H/2-1 | |
s2_f = torch.tensor( | |
[np.sin(2 * np.pi * i * forecast_linspace) for i in self.get_frequencies(p2)], dtype=torch.float32 | |
) | |
self.register_buffer("S_forecast", torch.cat([s1_f, s2_f])) | |
def forward(self, x) -> Tuple[torch.Tensor, torch.Tensor]: | |
x = super().forward(x) | |
amplitudes_backward = self.theta_b_fc(x) | |
backcast = amplitudes_backward.mm(self.S_backcast) | |
amplitudes_forward = self.theta_f_fc(x) | |
forecast = amplitudes_forward.mm(self.S_forecast) | |
return backcast, forecast | |
def get_frequencies(self, n): | |
return np.linspace(0, (self.backcast_length + self.forecast_length) / self.min_period, n) | |
class NBEATSTrendBlock(NBEATSBlock): | |
def __init__( | |
self, | |
units, | |
thetas_dim, | |
num_block_layers=4, | |
backcast_length=10, | |
forecast_length=5, | |
dropout=0.1, | |
############################## | |
activation_fn: str = 'relu' | |
############################### | |
): | |
super().__init__( | |
units=units, | |
thetas_dim=thetas_dim, | |
num_block_layers=num_block_layers, | |
backcast_length=backcast_length, | |
forecast_length=forecast_length, | |
share_thetas=True, | |
dropout=dropout, | |
activation_fn = activation_fn | |
) | |
backcast_linspace, forecast_linspace = linspace(backcast_length, forecast_length, centered=True) | |
norm = np.sqrt(forecast_length / thetas_dim) # ensure range of predictions is comparable to input | |
coefficients = torch.tensor([backcast_linspace ** i for i in range(thetas_dim)], dtype=torch.float32) | |
self.register_buffer("T_backcast", coefficients * norm) | |
coefficients = torch.tensor([forecast_linspace ** i for i in range(thetas_dim)], dtype=torch.float32) | |
self.register_buffer("T_forecast", coefficients * norm) | |
def forward(self, x) -> Tuple[torch.Tensor, torch.Tensor]: | |
x = super().forward(x) | |
backcast = self.theta_b_fc(x).mm(self.T_backcast) | |
forecast = self.theta_f_fc(x).mm(self.T_forecast) | |
return backcast, forecast | |
class NBEATSGenericBlock(NBEATSBlock): | |
def __init__( | |
self, | |
units, | |
thetas_dim, | |
num_block_layers=4, | |
backcast_length=10, | |
forecast_length=5, | |
dropout=0.1, | |
############################## | |
activation_fn: str = 'relu' | |
############################### | |
): | |
super().__init__( | |
units=units, | |
thetas_dim=thetas_dim, | |
num_block_layers=num_block_layers, | |
backcast_length=backcast_length, | |
forecast_length=forecast_length, | |
dropout=dropout, | |
activation_fn = activation_fn | |
) | |
self.backcast_fc = nn.Linear(thetas_dim, backcast_length) | |
self.forecast_fc = nn.Linear(thetas_dim, forecast_length) | |
def forward(self, x): | |
x = super().forward(x) | |
theta_b = F.relu(self.theta_b_fc(x)) | |
theta_f = F.relu(self.theta_f_fc(x)) | |
return self.backcast_fc(theta_b), self.forecast_fc(theta_f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment