Skip to content

Instantly share code, notes, and snippets.

@Muhammad4hmed
Last active October 19, 2021 16:26
Show Gist options
  • Save Muhammad4hmed/989bdabfb7d80a897fbfe13bd1322bb0 to your computer and use it in GitHub Desktop.
Save Muhammad4hmed/989bdabfb7d80a897fbfe13bd1322bb0 to your computer and use it in GitHub Desktop.
# dataset.py
import torch
import numpy as np
from PIL import Image
from PIL import ImageFile
from torch.utils.data import DataLoader, Dataset
# sometimes, you will have images without anending bit# this takes care of those kind of (corrupt) images
ImageFile.LOAD_TRUNCATED_IMAGES = True
class Model(nn.Module):
def __init__(self, model_name='resnet18', pretrained=False):
super().__init__()
self.model = timm.create_model(model_name, pretrained=pretrained)
n_features = self.model.fc.in_features
self.model.fc = nn.Linear(n_features, 1)
def forward(self, x):
x = self.model(x)
return x
class ClassificationDataset:
def __init__(
self,
image_paths,
targets,
resize=None,
augmentations=None
):
"""
:param image_paths: list of path to images
:param targets: numpy array
:param resize: tuple, e.g. (256, 256), resizes image if not None
:param augmentations: albumentation augmentations
"""
self.image_paths = image_paths
self.targets = targets
self.resize = resize
self.augmentations = augmentations
def __len__(self):
"""
Return the total number of samples in the dataset
"""
return len(self.image_paths)
def __getitem__(self,item):
"""
For a given "item" index, return everything we needto train a given model
"""
# use PIL to open the image
image = Image.open(self.image_paths[item])
# convert image to RGB, we have single channel images
image = image.convert("RGB")
# grab correct targets
targets = self.targets[item]
# resize if needed
if self.resize is not None:
image = image.resize(
(self.resize[1], self.resize[0]),
resample=Image.BILINEAR
)
# convert image to numpy array
image = np.array(image)
# if we have albumentation augmentations
# add them to the image
if self.augmentations is not None:
augmented = self.augmentations(image=image)
image = augmented["image"]
# pytorch expects CHW instead of HWC
image = np.transpose(image, (2, 0, 1)).astype(np.float32)
# return tensors of image and targets
# take a look at the types!
# for regression tasks,
# dtype of targets will change to torch.float
return {
"image": torch.tensor(image, dtype=torch.float),
"targets": torch.tensor(targets, dtype=torch.long),
}
class TestDataset:
def __init__(
self,
image_paths,
resize=None,
augmentations=None
):
"""
:param image_paths: list of path to images
:param targets: numpy array
:param resize: tuple, e.g. (256, 256), resizes image if not None
:param augmentations: albumentation augmentations
"""
self.image_paths = image_paths
self.resize = resize
self.augmentations = augmentations
def __len__(self):
"""
Return the total number of samples in the dataset
"""
return len(self.image_paths)
def __getitem__(self,item):
"""
For a given "item" index, return everything we needto train a given model
"""
# use PIL to open the image
image = Image.open(self.image_paths[item])
# convert image to RGB, we have single channel images
image = image.convert("RGB")
# resize if needed
if self.resize is not None:
image = image.resize(
(self.resize[1], self.resize[0]),
resample=Image.BILINEAR
)
# convert image to numpy array
image = np.array(image)
# if we have albumentation augmentations
# add them to the image
if self.augmentations is not None:
augmented = self.augmentations(image=image)
image = augmented["image"]
# pytorch expects CHW instead of HWC
image = np.transpose(image, (2, 0, 1)).astype(np.float32)
# return tensors of image and targets
# take a look at the types!
# for regression tasks,
# dtype of targets will change to torch.float
return torch.tensor(image, dtype=torch.float)
# engine.py
import torch
import torch.nn as nn
from tqdm import tqdm
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts, CosineAnnealingLR, ReduceLROnPlateau
def get_scheduler(optimizer, scheduler):
if scheduler=='ReduceLROnPlateau':
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.2, patience=4, verbose=True, eps=1e-6)
elif scheduler=='CosineAnnealingLR':
scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=1e-6, last_epoch=-1)
elif scheduler=='CosineAnnealingWarmRestarts':
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=1, eta_min=1e-6, last_epoch=-1)
return scheduler
def train_model(data_loader, model, optimizer, device):
"""
This function does training for one epoch
:param data_loader: this is the pytorch dataloader
:param model: pytorch model
:param optimizer: optimizer, for e.g. adam, sgd, etc
:param device: cuda/cpu
"""
# put the model in train mode
model.train()
scheduler = get_scheduler(optimizer, 'CosineAnnealingWarmRestarts')
# go over every batch of data in data loader
for data in data_loader:
# remember, we have image and targets
# in our dataset class
inputs = data["image"]
targets = data["targets"]
# move inputs/targets to cuda/cpu device
inputs = inputs.to(device, dtype=torch.float)
targets = targets.to(device, dtype=torch.float)
# zero grad the optimizer
optimizer.zero_grad()
#do the forward step of model
outputs = model(inputs)
# calculate loss
loss = nn.CrossEntropyLoss()(outputs,targets.long())
# backward step the loss
loss.backward()
# step optimizer
optimizer.step()
# if you have a scheduler, you either need to
# step it here or you have to step it after
# the epoch. here, we are not using any learning
# rate scheduler
scheduler.step()
def evaluate_model(data_loader, model, device):
"""
This function does evaluation for one epoch
:param data_loader: this is the pytorch dataloader
:param model: pytorch model
:param device: cuda/cpu
"""
# put model in evaluation mode
model.eval()
# init lists to store targets and outputs
final_targets = []
final_outputs = []
# we use no_grad context
with torch.no_grad():
for data in data_loader:
inputs = data["image"]
targets = data["targets"]
inputs = inputs.to(device, dtype=torch.float)
targets = targets.to(device, dtype=torch.float)
# do the forward step to generate prediction
output = model(inputs)
# convert targets and outputs to lists
targets = targets.detach().cpu().numpy().tolist()
output = output.detach().cpu().numpy().tolist()
# extend the original list
final_targets.extend(targets)
final_outputs.extend(output)
# return final output and final targets
return final_outputs, final_targets
# model.py
import torch.nn as nn
import pretrainedmodels
def get_model(pretrained):
if pretrained:
model = pretrainedmodels.__dict__["alexnet"](
pretrained='imagenet'
)
else:
model = pretrainedmodels.__dict__["alexnet"](
pretrained=None
)
# print the model here to know whats going on.
model.last_linear = nn.Sequential(
nn.BatchNorm1d(4096),
nn.Dropout(p=0.25),
nn.Linear(in_features=4096, out_features=2048),
nn.ReLU(),
nn.BatchNorm1d(2048, eps=1e-05, momentum=0.1),
nn.Dropout(p=0.5),
nn.Linear(in_features=2048, out_features=1), # out features
)
return model
# utils.py
import os
def save_checkpoint(model, optimizer, path):
if not os.path.exists(os.path.dirname(path)):
print("Creating directories on path: `{}`".format(path))
os.makedirs(os.path.dirname(path))
torch.save({
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
}, path)
def load_checkpoint(model, path):
checkpoint = torch.load(path)
model.load_state_dict(checkpoint["model_state_dict"])
optimizer = torch.optim.Adam(model.parameters())
optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
return model, optimizer
def save_model(model, path):
if not os.path.exists(os.path.dirname(path)):
print("Creating directories on path: `{}`".format(path))
os.makedirs(os.path.dirname(path))
torch.save({
"model_state_dict": model.state_dict(),
}, path)
def load_model(moodel, path):
restore_dict = torch.load(path)
model.load_state_dict(restore_dict["model_state_dict"])
model.eval()
return model
# inference.py
def inference(model, states, test_loader, device):
model.to(device)
tk0 = tqdm.tqdm(enumerate(test_loader), total=len(test_loader))
probs = []
for i, data in tk0:
images = data["image"]
targets = data["targets"]
images = images.to(device)
avg_preds = []
for state in states:
model.load_state_dict(torch.load(state)["model_state_dict"])
model.eval()
with torch.no_grad():
y_preds = model(images)
avg_preds.append(y_preds.softmax(1).to('cpu').numpy())
avg_preds = np.mean(avg_preds, axis=0)
probs.append(avg_preds)
probs = np.concatenate(probs)
return probs
# metrics.py
from sklearn import metrics
def multi_class_roc_auc(true, pred_probs_arr, labels):
auc_all = []
for label_number in labels:
true_labels = true.loc[:,label_number].copy()
pred_probs = pred_probs_arr.loc[:, label_number].copy()
#AUROC and AP (sliding across multiple decision thresholds)
fpr, tpr, thresholds = metrics.roc_curve(y_true = true_labels,
y_score = pred_probs,
pos_label = 1)
auc = metrics.auc(fpr, tpr)
auc_all.append(auc)
print(f'AUC of each class: {auc_all}')
return np.mean(auc_all)
import glob
import pandas as pd
def imagesInFolder():
data_frame = pd.DataFrame(columns = ['image_path', 'label'])
i = 0
for each_class in glob.glob('path/*'):
for each_image in glob.glob(each_class + '/*'):
data_frame.at[i, 'label'] = each_class.split('/')[-1]
data_frame.at[i, 'image_path'] = each_image
i += 1
data_frame = data_frame.sample(frac=1).reset_index(drop = True)
return data_frame
# train.py
import os
import pandas as pd
import numpy as np
import albumentations
import torch
from sklearn import metrics
from sklearn.model_selection import train_test_split
import random
# import dataset
# import engine
# from model import get_model
if __name__ == "__main__":
def seed_torch(seed=42):
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
seed_torch(seed=CFG.seed)
# location of train.csv and train_png folder
# with all the png images
data_path = "/home/abhishek/workspace/siim_png/"
# cuda/cpu device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device
# let's train for 10 epochs
epochs = 10
# load the dataframe
df = pd.read_csv(os.path.join(data_path, "train.csv"))
# fetch all image ids
images = df.ImageId.values.tolist()
# a list with image locations
images = [
os.path.join(data_path, "train_png", i + ".png") for i in images
]
# binary targets numpy array
targets = df.target.values
# fetch out model, we will try both pretrained
# and non-pretrained weights
model = get_model(pretrained=True)
# move model to device
model.to(device)
# mean and std values of RGB channels for imagenet dataset
# we use these pre-calculated values when we use weights
# from imagenet.
# when we do not use imagenet weights, we use the mean and
# standard deviation values of the original dataset
# please note that this is a separate calculation
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)
# albumentations is an image augmentation library
# that allows you do to many different types of image
# augmentations. here, i am using only normalization
# notice always_apply=True. we always want to apply
# normalization
aug = albumentations.Compose(
[
albumentations.Normalize(
mean, std, max_pixel_value=255.0, always_apply=True
)
]
)
# instead of using kfold, i am using train_test_split
#with a fixed random state
train_images, valid_images, train_targets, valid_targets = train_test_split(
images, targets, stratify=targets, random_state=42
)
# fetch theClassificationDataset class
train_dataset = dataset.ClassificationDataset(
image_paths=train_images,
targets=train_targets,
resize=(227, 227),
augmentations=aug,
)
# torch dataloader creates batches of data
# from classification dataset class
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=16, shuffle=True, num_workers=4
)
# same for validation data
valid_dataset = dataset.ClassificationDataset(
image_paths=valid_images,
targets=valid_targets,
resize=(227, 227),
augmentations=aug,
)
valid_loader = torch.utils.data.DataLoader(
valid_dataset, batch_size=16, shuffle=False, num_workers=4
)
# simple Adam optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=5e-4)
# train and print auc score for all epochs
for epoch in range(epochs):
train_model(train_loader, model, optimizer, device=device)
predictions, valid_targets = evaluate_model(
valid_loader, model, device=device
)
roc_auc = metrics.roc_auc_score(valid_targets, predictions)
# roc_auc = multi_class_roc_auc(targets.copy(), preds.copy(), labels)
print(
f"Epoch={epoch}, Valid ROC AUC={roc_auc}"
)
save_checkpoint(model, optimizer, f'./model_{epoch}.pth')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment