Last active
September 23, 2023 10:15
-
-
Save mbed92/5208a336a2d6b58a43ef6288204d70f8 to your computer and use it in GitHub Desktop.
[PhD Reviews] This code serves the purpose of extending experiments performed during a PhD study. Its primary focus is on designing and training a fully connected neural network (FC) using the PyTorch framework. The task involves regression of the stiffness parameter using data collected from an IMUs attached to a 2-finger gripper.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pickle | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
from sklearn.model_selection import KFold | |
from torch.utils.data import Dataset, DataLoader | |
from torch.utils.tensorboard import SummaryWriter | |
from torchsummary import summary | |
torch.manual_seed(42) | |
# Size of ConvNet from the original paper: | |
conv_params = (3 * 1 * 128 + 128) + (3 * 128 * 256 + 256) + (3 * 256 * 512 + 512) | |
bn_params = 2 * (128 + 256 + 512) | |
dense_params = (512 * 256 + 256) + (256 * 128 + 128) + (128 * 64 + 64) + (64 * 1 + 1) | |
conv_net_num_params = conv_params + bn_params + dense_params | |
print("Number of learnable parameters in the ConvNet:", conv_net_num_params) | |
def create_fc_net(input_size: list, fc_layers: list, num_outputs: int = None): | |
assert fc_layers is not None and len(fc_layers) >= 1 | |
fc_net = nn.Sequential() | |
fc_net.add_module("flatten", nn.Flatten()) | |
# calculate the flattened input size | |
flattened_input_size = np.prod(input_size) | |
if len(fc_layers) > 1: | |
prev_fc_units = flattened_input_size | |
for i, fc_units in enumerate(fc_layers): | |
fc_net.add_module(f"dense_{i}", nn.Linear(prev_fc_units, fc_units)) | |
fc_net.add_module(f"batchnorm_{i}", nn.BatchNorm1d(fc_units)) | |
fc_net.add_module(f"activation_{i}", nn.ReLU()) | |
fc_net.add_module(f"dropout_{i}", nn.Dropout(0.1)) | |
prev_fc_units = fc_units | |
if num_outputs is not None and num_outputs >= 1: | |
fc_net.add_module("output", nn.Linear(fc_layers[-1], num_outputs)) | |
else: | |
fc_net.add_module("output", nn.Linear(fc_layers[-1], num_outputs)) | |
return fc_net | |
def count_parameters(model): | |
return sum(p.numel() for p in model.parameters() if p.requires_grad) | |
# Define input shape and network architecture | |
input_size = [200, 12] | |
fc_layers = [256, 128, 64] | |
num_outputs = 1 | |
# get setting from the original paper | |
batch_size = 100 | |
epochs = 100 | |
lr = 1e-3 | |
gamma = 0.9999 | |
stiffness_range = 1100.0 | |
stiffness_min = 300.0 | |
weight_decay = 1e-5 | |
tensorboard_path = "./logs" | |
num_splits = 5 | |
# Load the dataset from pickle file, create the torch Dataset object and return torch DataLoader object | |
class CustomDataset(Dataset): | |
def __init__(self, data, augment): | |
self.data = np.array(data['data']) | |
self.labels = np.array(data['stiffness']) | |
if augment: | |
self.data[:, :, :6] += np.random.normal(0.0, 0.7, [*self.data.shape[:2], 6]) | |
self.data[:, :, 6:] += np.random.normal(0.0, 0.06, [*self.data.shape[:2], 6]) | |
def __len__(self): | |
return len(self.labels) | |
def __getitem__(self, idx): | |
return self.data[idx], self.labels[idx] | |
def normalize(self, mean, std): | |
self.data = (self.data - mean) / std | |
def __add__(self, other): | |
self.data = np.concatenate([self.data, other.data], 0) | |
self.labels = np.concatenate([self.labels, other.labels], 0) | |
return self | |
def load_dataset(path, augment): | |
with open(path, "rb") as fp: | |
raw_data = pickle.load(fp) | |
dataset = CustomDataset(raw_data, augment=augment) | |
del raw_data | |
return dataset | |
# concatenate train and val dataset to create one dataset for cross-validation | |
train_dataset = load_dataset("train.pickle", augment=True) | |
test_dataset = load_dataset("val.pickle", augment=True) | |
total_dataset = train_dataset + test_dataset | |
total_dataloader = DataLoader(total_dataset, batch_size=batch_size, shuffle=True) | |
# normalize the dataset | |
mean = np.mean(total_dataset.data, 0, keepdims=True) | |
std = np.std(total_dataset.data, 0, keepdims=True) | |
total_dataset.normalize(mean, std) | |
# start cross-validation like in the original paper | |
running_test_mape, running_test_mae = [], [] | |
kf = KFold(n_splits=num_splits, shuffle=True) | |
for train_index, test_index in kf.split(total_dataset): | |
train_dataset = torch.utils.data.Subset(total_dataset, train_index) | |
test_dataset = torch.utils.data.Subset(total_dataset, test_index) | |
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) | |
test_dataloader = DataLoader(test_dataset, batch_size=batch_size) | |
# create the FC network | |
fc_net = create_fc_net(input_size, fc_layers, num_outputs).cuda() | |
print(f"Number of learnable parameters in the FcNetwork: {count_parameters(fc_net)}") | |
summary(fc_net, input_size=input_size) | |
# assert that numbers of parameters in both nets does not differ more than 5% | |
fcn_num_params = count_parameters(fc_net) | |
assert abs(fcn_num_params - conv_net_num_params) / fcn_num_params < 0.05 | |
# setup optimization procedure as in the original paper | |
optimizer = torch.optim.AdamW(fc_net.parameters(), lr=lr, weight_decay=weight_decay) | |
eta_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=gamma) | |
criterion = nn.L1Loss() | |
# setup metrics Mean Absolute Percentage Error and Mean Absolute Error using torch Mean | |
mape = lambda y_hat, y_true: torch.mean(torch.abs((y_true - y_hat) / y_true)) * 100 | |
mae = lambda y_hat, y_true: torch.mean(torch.abs(y_true - y_hat)) | |
best_mape, best_mae_corresponding, best_epoch = torch.inf, 0.0, 0 | |
# normalize the output prediction to stiffness values | |
norm = lambda raw_output, y_range, y_min: (y_range * torch.sigmoid(raw_output) + y_min) | |
# start training and testng loop | |
os.makedirs(tensorboard_path, exist_ok=True) | |
with SummaryWriter(log_dir=tensorboard_path) as writer: | |
for epoch in range(epochs): | |
print("\n\nTraining...") | |
loss_list = [] | |
fc_net.train() | |
for i, (x, y) in enumerate(train_dataloader): | |
y_raw = fc_net(x.float().cuda()) | |
y_pred = norm(y_raw, stiffness_range, stiffness_min).squeeze() | |
loss = criterion(y_pred, y.cuda().float()) | |
loss_list.append(loss.item()) | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
# log every 10th step | |
if i % 10 == 0: | |
print( | |
f"Epoch: {epoch}, Step: {i}, Loss: {loss.item()}, Learning rate: {eta_scheduler.get_last_lr()}" | |
f"MAPE: {mape(y_pred, y.cuda().float())}, MAE: {mae(y_pred, y.cuda().float())}") | |
eta_scheduler.step() | |
print("\n\nTesting...") | |
mape_list, mae_list = [], [] | |
fc_net.eval() | |
with torch.no_grad(): | |
for i, (x, y) in enumerate(test_dataloader): | |
y_raw = fc_net(x.float().cuda()) | |
y_pred = norm(y_raw, stiffness_range, stiffness_min).squeeze() | |
mape_list.append(mape(y_pred, y.cuda().float())) | |
mae_list.append(mae(y_pred, y.cuda().float())) | |
# calculate epoch metrics | |
epoch_loss = np.mean(loss_list) | |
epoch_mape = torch.mean(torch.stack(mape_list)).detach().cpu().numpy() | |
epoch_mae = torch.mean(torch.stack(mae_list)).detach().cpu().numpy() | |
# log metrics to tensorboard | |
writer.add_scalar("trainig/loss", epoch_loss, epoch) | |
writer.add_scalar("trainig/lr", torch.Tensor([eta_scheduler.get_last_lr()]), epoch) | |
writer.add_scalar("test/MAPE", epoch_mape, epoch) | |
writer.add_scalar("test/MAE", epoch_mae, epoch) | |
# save the best model | |
if epoch_mape < best_mape: | |
print(f"New best MAPE: {epoch_mape}, MAE: {epoch_mae}, Epoch: {epoch}.") | |
best_mape = epoch_mape | |
best_mae_corresponding = epoch_mae | |
best_epoch = epoch | |
torch.save(fc_net, "best_model.pth") | |
running_test_mape.append(best_mape) | |
running_test_mae.append(best_mae_corresponding) | |
print(f"Training finished. " | |
f"Best MAPE: {np.mean(running_test_mape)} +/- {np.std(running_test_mape)} [%] " | |
f"MAE: {np.mean(running_test_mae)} +/- {np.std(running_test_mae)} [N/m]") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment