Skip to content

Instantly share code, notes, and snippets.

@mbed92
Last active September 23, 2023 10:15
Show Gist options
  • Save mbed92/5208a336a2d6b58a43ef6288204d70f8 to your computer and use it in GitHub Desktop.
Save mbed92/5208a336a2d6b58a43ef6288204d70f8 to your computer and use it in GitHub Desktop.
[PhD Reviews] This code serves the purpose of extending experiments performed during a PhD study. Its primary focus is on designing and training a fully connected neural network (FC) using the PyTorch framework. The task involves regression of the stiffness parameter using data collected from an IMUs attached to a 2-finger gripper.
import os
import pickle
import numpy as np
import torch
import torch.nn as nn
from sklearn.model_selection import KFold
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchsummary import summary
torch.manual_seed(42)
# Size of ConvNet from the original paper:
conv_params = (3 * 1 * 128 + 128) + (3 * 128 * 256 + 256) + (3 * 256 * 512 + 512)
bn_params = 2 * (128 + 256 + 512)
dense_params = (512 * 256 + 256) + (256 * 128 + 128) + (128 * 64 + 64) + (64 * 1 + 1)
conv_net_num_params = conv_params + bn_params + dense_params
print("Number of learnable parameters in the ConvNet:", conv_net_num_params)
def create_fc_net(input_size: list, fc_layers: list, num_outputs: int = None):
assert fc_layers is not None and len(fc_layers) >= 1
fc_net = nn.Sequential()
fc_net.add_module("flatten", nn.Flatten())
# calculate the flattened input size
flattened_input_size = np.prod(input_size)
if len(fc_layers) > 1:
prev_fc_units = flattened_input_size
for i, fc_units in enumerate(fc_layers):
fc_net.add_module(f"dense_{i}", nn.Linear(prev_fc_units, fc_units))
fc_net.add_module(f"batchnorm_{i}", nn.BatchNorm1d(fc_units))
fc_net.add_module(f"activation_{i}", nn.ReLU())
fc_net.add_module(f"dropout_{i}", nn.Dropout(0.1))
prev_fc_units = fc_units
if num_outputs is not None and num_outputs >= 1:
fc_net.add_module("output", nn.Linear(fc_layers[-1], num_outputs))
else:
fc_net.add_module("output", nn.Linear(fc_layers[-1], num_outputs))
return fc_net
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
# Define input shape and network architecture
input_size = [200, 12]
fc_layers = [256, 128, 64]
num_outputs = 1
# get setting from the original paper
batch_size = 100
epochs = 100
lr = 1e-3
gamma = 0.9999
stiffness_range = 1100.0
stiffness_min = 300.0
weight_decay = 1e-5
tensorboard_path = "./logs"
num_splits = 5
# Load the dataset from pickle file, create the torch Dataset object and return torch DataLoader object
class CustomDataset(Dataset):
def __init__(self, data, augment):
self.data = np.array(data['data'])
self.labels = np.array(data['stiffness'])
if augment:
self.data[:, :, :6] += np.random.normal(0.0, 0.7, [*self.data.shape[:2], 6])
self.data[:, :, 6:] += np.random.normal(0.0, 0.06, [*self.data.shape[:2], 6])
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
return self.data[idx], self.labels[idx]
def normalize(self, mean, std):
self.data = (self.data - mean) / std
def __add__(self, other):
self.data = np.concatenate([self.data, other.data], 0)
self.labels = np.concatenate([self.labels, other.labels], 0)
return self
def load_dataset(path, augment):
with open(path, "rb") as fp:
raw_data = pickle.load(fp)
dataset = CustomDataset(raw_data, augment=augment)
del raw_data
return dataset
# concatenate train and val dataset to create one dataset for cross-validation
train_dataset = load_dataset("train.pickle", augment=True)
test_dataset = load_dataset("val.pickle", augment=True)
total_dataset = train_dataset + test_dataset
total_dataloader = DataLoader(total_dataset, batch_size=batch_size, shuffle=True)
# normalize the dataset
mean = np.mean(total_dataset.data, 0, keepdims=True)
std = np.std(total_dataset.data, 0, keepdims=True)
total_dataset.normalize(mean, std)
# start cross-validation like in the original paper
running_test_mape, running_test_mae = [], []
kf = KFold(n_splits=num_splits, shuffle=True)
for train_index, test_index in kf.split(total_dataset):
train_dataset = torch.utils.data.Subset(total_dataset, train_index)
test_dataset = torch.utils.data.Subset(total_dataset, test_index)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)
# create the FC network
fc_net = create_fc_net(input_size, fc_layers, num_outputs).cuda()
print(f"Number of learnable parameters in the FcNetwork: {count_parameters(fc_net)}")
summary(fc_net, input_size=input_size)
# assert that numbers of parameters in both nets does not differ more than 5%
fcn_num_params = count_parameters(fc_net)
assert abs(fcn_num_params - conv_net_num_params) / fcn_num_params < 0.05
# setup optimization procedure as in the original paper
optimizer = torch.optim.AdamW(fc_net.parameters(), lr=lr, weight_decay=weight_decay)
eta_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=gamma)
criterion = nn.L1Loss()
# setup metrics Mean Absolute Percentage Error and Mean Absolute Error using torch Mean
mape = lambda y_hat, y_true: torch.mean(torch.abs((y_true - y_hat) / y_true)) * 100
mae = lambda y_hat, y_true: torch.mean(torch.abs(y_true - y_hat))
best_mape, best_mae_corresponding, best_epoch = torch.inf, 0.0, 0
# normalize the output prediction to stiffness values
norm = lambda raw_output, y_range, y_min: (y_range * torch.sigmoid(raw_output) + y_min)
# start training and testng loop
os.makedirs(tensorboard_path, exist_ok=True)
with SummaryWriter(log_dir=tensorboard_path) as writer:
for epoch in range(epochs):
print("\n\nTraining...")
loss_list = []
fc_net.train()
for i, (x, y) in enumerate(train_dataloader):
y_raw = fc_net(x.float().cuda())
y_pred = norm(y_raw, stiffness_range, stiffness_min).squeeze()
loss = criterion(y_pred, y.cuda().float())
loss_list.append(loss.item())
optimizer.zero_grad()
loss.backward()
optimizer.step()
# log every 10th step
if i % 10 == 0:
print(
f"Epoch: {epoch}, Step: {i}, Loss: {loss.item()}, Learning rate: {eta_scheduler.get_last_lr()}"
f"MAPE: {mape(y_pred, y.cuda().float())}, MAE: {mae(y_pred, y.cuda().float())}")
eta_scheduler.step()
print("\n\nTesting...")
mape_list, mae_list = [], []
fc_net.eval()
with torch.no_grad():
for i, (x, y) in enumerate(test_dataloader):
y_raw = fc_net(x.float().cuda())
y_pred = norm(y_raw, stiffness_range, stiffness_min).squeeze()
mape_list.append(mape(y_pred, y.cuda().float()))
mae_list.append(mae(y_pred, y.cuda().float()))
# calculate epoch metrics
epoch_loss = np.mean(loss_list)
epoch_mape = torch.mean(torch.stack(mape_list)).detach().cpu().numpy()
epoch_mae = torch.mean(torch.stack(mae_list)).detach().cpu().numpy()
# log metrics to tensorboard
writer.add_scalar("trainig/loss", epoch_loss, epoch)
writer.add_scalar("trainig/lr", torch.Tensor([eta_scheduler.get_last_lr()]), epoch)
writer.add_scalar("test/MAPE", epoch_mape, epoch)
writer.add_scalar("test/MAE", epoch_mae, epoch)
# save the best model
if epoch_mape < best_mape:
print(f"New best MAPE: {epoch_mape}, MAE: {epoch_mae}, Epoch: {epoch}.")
best_mape = epoch_mape
best_mae_corresponding = epoch_mae
best_epoch = epoch
torch.save(fc_net, "best_model.pth")
running_test_mape.append(best_mape)
running_test_mae.append(best_mae_corresponding)
print(f"Training finished. "
f"Best MAPE: {np.mean(running_test_mape)} +/- {np.std(running_test_mape)} [%] "
f"MAE: {np.mean(running_test_mae)} +/- {np.std(running_test_mae)} [N/m]")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment