Last active
October 19, 2021 16:26
-
-
Save Muhammad4hmed/989bdabfb7d80a897fbfe13bd1322bb0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# dataset.py | |
import torch | |
import numpy as np | |
from PIL import Image | |
from PIL import ImageFile | |
from torch.utils.data import DataLoader, Dataset | |
# sometimes, you will have images without anending bit# this takes care of those kind of (corrupt) images | |
ImageFile.LOAD_TRUNCATED_IMAGES = True | |
class Model(nn.Module): | |
def __init__(self, model_name='resnet18', pretrained=False): | |
super().__init__() | |
self.model = timm.create_model(model_name, pretrained=pretrained) | |
n_features = self.model.fc.in_features | |
self.model.fc = nn.Linear(n_features, 1) | |
def forward(self, x): | |
x = self.model(x) | |
return x | |
class ClassificationDataset: | |
def __init__( | |
self, | |
image_paths, | |
targets, | |
resize=None, | |
augmentations=None | |
): | |
""" | |
:param image_paths: list of path to images | |
:param targets: numpy array | |
:param resize: tuple, e.g. (256, 256), resizes image if not None | |
:param augmentations: albumentation augmentations | |
""" | |
self.image_paths = image_paths | |
self.targets = targets | |
self.resize = resize | |
self.augmentations = augmentations | |
def __len__(self): | |
""" | |
Return the total number of samples in the dataset | |
""" | |
return len(self.image_paths) | |
def __getitem__(self,item): | |
""" | |
For a given "item" index, return everything we needto train a given model | |
""" | |
# use PIL to open the image | |
image = Image.open(self.image_paths[item]) | |
# convert image to RGB, we have single channel images | |
image = image.convert("RGB") | |
# grab correct targets | |
targets = self.targets[item] | |
# resize if needed | |
if self.resize is not None: | |
image = image.resize( | |
(self.resize[1], self.resize[0]), | |
resample=Image.BILINEAR | |
) | |
# convert image to numpy array | |
image = np.array(image) | |
# if we have albumentation augmentations | |
# add them to the image | |
if self.augmentations is not None: | |
augmented = self.augmentations(image=image) | |
image = augmented["image"] | |
# pytorch expects CHW instead of HWC | |
image = np.transpose(image, (2, 0, 1)).astype(np.float32) | |
# return tensors of image and targets | |
# take a look at the types! | |
# for regression tasks, | |
# dtype of targets will change to torch.float | |
return { | |
"image": torch.tensor(image, dtype=torch.float), | |
"targets": torch.tensor(targets, dtype=torch.long), | |
} | |
class TestDataset: | |
def __init__( | |
self, | |
image_paths, | |
resize=None, | |
augmentations=None | |
): | |
""" | |
:param image_paths: list of path to images | |
:param targets: numpy array | |
:param resize: tuple, e.g. (256, 256), resizes image if not None | |
:param augmentations: albumentation augmentations | |
""" | |
self.image_paths = image_paths | |
self.resize = resize | |
self.augmentations = augmentations | |
def __len__(self): | |
""" | |
Return the total number of samples in the dataset | |
""" | |
return len(self.image_paths) | |
def __getitem__(self,item): | |
""" | |
For a given "item" index, return everything we needto train a given model | |
""" | |
# use PIL to open the image | |
image = Image.open(self.image_paths[item]) | |
# convert image to RGB, we have single channel images | |
image = image.convert("RGB") | |
# resize if needed | |
if self.resize is not None: | |
image = image.resize( | |
(self.resize[1], self.resize[0]), | |
resample=Image.BILINEAR | |
) | |
# convert image to numpy array | |
image = np.array(image) | |
# if we have albumentation augmentations | |
# add them to the image | |
if self.augmentations is not None: | |
augmented = self.augmentations(image=image) | |
image = augmented["image"] | |
# pytorch expects CHW instead of HWC | |
image = np.transpose(image, (2, 0, 1)).astype(np.float32) | |
# return tensors of image and targets | |
# take a look at the types! | |
# for regression tasks, | |
# dtype of targets will change to torch.float | |
return torch.tensor(image, dtype=torch.float) | |
# engine.py | |
import torch | |
import torch.nn as nn | |
from tqdm import tqdm | |
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts, CosineAnnealingLR, ReduceLROnPlateau | |
def get_scheduler(optimizer, scheduler): | |
if scheduler=='ReduceLROnPlateau': | |
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.2, patience=4, verbose=True, eps=1e-6) | |
elif scheduler=='CosineAnnealingLR': | |
scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=1e-6, last_epoch=-1) | |
elif scheduler=='CosineAnnealingWarmRestarts': | |
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=1, eta_min=1e-6, last_epoch=-1) | |
return scheduler | |
def train_model(data_loader, model, optimizer, device): | |
""" | |
This function does training for one epoch | |
:param data_loader: this is the pytorch dataloader | |
:param model: pytorch model | |
:param optimizer: optimizer, for e.g. adam, sgd, etc | |
:param device: cuda/cpu | |
""" | |
# put the model in train mode | |
model.train() | |
scheduler = get_scheduler(optimizer, 'CosineAnnealingWarmRestarts') | |
# go over every batch of data in data loader | |
for data in data_loader: | |
# remember, we have image and targets | |
# in our dataset class | |
inputs = data["image"] | |
targets = data["targets"] | |
# move inputs/targets to cuda/cpu device | |
inputs = inputs.to(device, dtype=torch.float) | |
targets = targets.to(device, dtype=torch.float) | |
# zero grad the optimizer | |
optimizer.zero_grad() | |
#do the forward step of model | |
outputs = model(inputs) | |
# calculate loss | |
loss = nn.CrossEntropyLoss()(outputs,targets.long()) | |
# backward step the loss | |
loss.backward() | |
# step optimizer | |
optimizer.step() | |
# if you have a scheduler, you either need to | |
# step it here or you have to step it after | |
# the epoch. here, we are not using any learning | |
# rate scheduler | |
scheduler.step() | |
def evaluate_model(data_loader, model, device): | |
""" | |
This function does evaluation for one epoch | |
:param data_loader: this is the pytorch dataloader | |
:param model: pytorch model | |
:param device: cuda/cpu | |
""" | |
# put model in evaluation mode | |
model.eval() | |
# init lists to store targets and outputs | |
final_targets = [] | |
final_outputs = [] | |
# we use no_grad context | |
with torch.no_grad(): | |
for data in data_loader: | |
inputs = data["image"] | |
targets = data["targets"] | |
inputs = inputs.to(device, dtype=torch.float) | |
targets = targets.to(device, dtype=torch.float) | |
# do the forward step to generate prediction | |
output = model(inputs) | |
# convert targets and outputs to lists | |
targets = targets.detach().cpu().numpy().tolist() | |
output = output.detach().cpu().numpy().tolist() | |
# extend the original list | |
final_targets.extend(targets) | |
final_outputs.extend(output) | |
# return final output and final targets | |
return final_outputs, final_targets | |
# model.py | |
import torch.nn as nn | |
import pretrainedmodels | |
def get_model(pretrained): | |
if pretrained: | |
model = pretrainedmodels.__dict__["alexnet"]( | |
pretrained='imagenet' | |
) | |
else: | |
model = pretrainedmodels.__dict__["alexnet"]( | |
pretrained=None | |
) | |
# print the model here to know whats going on. | |
model.last_linear = nn.Sequential( | |
nn.BatchNorm1d(4096), | |
nn.Dropout(p=0.25), | |
nn.Linear(in_features=4096, out_features=2048), | |
nn.ReLU(), | |
nn.BatchNorm1d(2048, eps=1e-05, momentum=0.1), | |
nn.Dropout(p=0.5), | |
nn.Linear(in_features=2048, out_features=1), # out features | |
) | |
return model | |
# utils.py | |
import os | |
def save_checkpoint(model, optimizer, path): | |
if not os.path.exists(os.path.dirname(path)): | |
print("Creating directories on path: `{}`".format(path)) | |
os.makedirs(os.path.dirname(path)) | |
torch.save({ | |
"model_state_dict": model.state_dict(), | |
"optimizer_state_dict": optimizer.state_dict(), | |
}, path) | |
def load_checkpoint(model, path): | |
checkpoint = torch.load(path) | |
model.load_state_dict(checkpoint["model_state_dict"]) | |
optimizer = torch.optim.Adam(model.parameters()) | |
optimizer.load_state_dict(checkpoint["optimizer_state_dict"]) | |
return model, optimizer | |
def save_model(model, path): | |
if not os.path.exists(os.path.dirname(path)): | |
print("Creating directories on path: `{}`".format(path)) | |
os.makedirs(os.path.dirname(path)) | |
torch.save({ | |
"model_state_dict": model.state_dict(), | |
}, path) | |
def load_model(moodel, path): | |
restore_dict = torch.load(path) | |
model.load_state_dict(restore_dict["model_state_dict"]) | |
model.eval() | |
return model | |
# inference.py | |
def inference(model, states, test_loader, device): | |
model.to(device) | |
tk0 = tqdm.tqdm(enumerate(test_loader), total=len(test_loader)) | |
probs = [] | |
for i, data in tk0: | |
images = data["image"] | |
targets = data["targets"] | |
images = images.to(device) | |
avg_preds = [] | |
for state in states: | |
model.load_state_dict(torch.load(state)["model_state_dict"]) | |
model.eval() | |
with torch.no_grad(): | |
y_preds = model(images) | |
avg_preds.append(y_preds.softmax(1).to('cpu').numpy()) | |
avg_preds = np.mean(avg_preds, axis=0) | |
probs.append(avg_preds) | |
probs = np.concatenate(probs) | |
return probs | |
# metrics.py | |
from sklearn import metrics | |
def multi_class_roc_auc(true, pred_probs_arr, labels): | |
auc_all = [] | |
for label_number in labels: | |
true_labels = true.loc[:,label_number].copy() | |
pred_probs = pred_probs_arr.loc[:, label_number].copy() | |
#AUROC and AP (sliding across multiple decision thresholds) | |
fpr, tpr, thresholds = metrics.roc_curve(y_true = true_labels, | |
y_score = pred_probs, | |
pos_label = 1) | |
auc = metrics.auc(fpr, tpr) | |
auc_all.append(auc) | |
print(f'AUC of each class: {auc_all}') | |
return np.mean(auc_all) | |
import glob | |
import pandas as pd | |
def imagesInFolder(): | |
data_frame = pd.DataFrame(columns = ['image_path', 'label']) | |
i = 0 | |
for each_class in glob.glob('path/*'): | |
for each_image in glob.glob(each_class + '/*'): | |
data_frame.at[i, 'label'] = each_class.split('/')[-1] | |
data_frame.at[i, 'image_path'] = each_image | |
i += 1 | |
data_frame = data_frame.sample(frac=1).reset_index(drop = True) | |
return data_frame | |
# train.py | |
import os | |
import pandas as pd | |
import numpy as np | |
import albumentations | |
import torch | |
from sklearn import metrics | |
from sklearn.model_selection import train_test_split | |
import random | |
# import dataset | |
# import engine | |
# from model import get_model | |
if __name__ == "__main__": | |
def seed_torch(seed=42): | |
random.seed(seed) | |
os.environ['PYTHONHASHSEED'] = str(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed(seed) | |
torch.backends.cudnn.deterministic = True | |
seed_torch(seed=CFG.seed) | |
# location of train.csv and train_png folder | |
# with all the png images | |
data_path = "/home/abhishek/workspace/siim_png/" | |
# cuda/cpu device | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
device | |
# let's train for 10 epochs | |
epochs = 10 | |
# load the dataframe | |
df = pd.read_csv(os.path.join(data_path, "train.csv")) | |
# fetch all image ids | |
images = df.ImageId.values.tolist() | |
# a list with image locations | |
images = [ | |
os.path.join(data_path, "train_png", i + ".png") for i in images | |
] | |
# binary targets numpy array | |
targets = df.target.values | |
# fetch out model, we will try both pretrained | |
# and non-pretrained weights | |
model = get_model(pretrained=True) | |
# move model to device | |
model.to(device) | |
# mean and std values of RGB channels for imagenet dataset | |
# we use these pre-calculated values when we use weights | |
# from imagenet. | |
# when we do not use imagenet weights, we use the mean and | |
# standard deviation values of the original dataset | |
# please note that this is a separate calculation | |
mean = (0.485, 0.456, 0.406) | |
std = (0.229, 0.224, 0.225) | |
# albumentations is an image augmentation library | |
# that allows you do to many different types of image | |
# augmentations. here, i am using only normalization | |
# notice always_apply=True. we always want to apply | |
# normalization | |
aug = albumentations.Compose( | |
[ | |
albumentations.Normalize( | |
mean, std, max_pixel_value=255.0, always_apply=True | |
) | |
] | |
) | |
# instead of using kfold, i am using train_test_split | |
#with a fixed random state | |
train_images, valid_images, train_targets, valid_targets = train_test_split( | |
images, targets, stratify=targets, random_state=42 | |
) | |
# fetch theClassificationDataset class | |
train_dataset = dataset.ClassificationDataset( | |
image_paths=train_images, | |
targets=train_targets, | |
resize=(227, 227), | |
augmentations=aug, | |
) | |
# torch dataloader creates batches of data | |
# from classification dataset class | |
train_loader = torch.utils.data.DataLoader( | |
train_dataset, batch_size=16, shuffle=True, num_workers=4 | |
) | |
# same for validation data | |
valid_dataset = dataset.ClassificationDataset( | |
image_paths=valid_images, | |
targets=valid_targets, | |
resize=(227, 227), | |
augmentations=aug, | |
) | |
valid_loader = torch.utils.data.DataLoader( | |
valid_dataset, batch_size=16, shuffle=False, num_workers=4 | |
) | |
# simple Adam optimizer | |
optimizer = torch.optim.Adam(model.parameters(), lr=5e-4) | |
# train and print auc score for all epochs | |
for epoch in range(epochs): | |
train_model(train_loader, model, optimizer, device=device) | |
predictions, valid_targets = evaluate_model( | |
valid_loader, model, device=device | |
) | |
roc_auc = metrics.roc_auc_score(valid_targets, predictions) | |
# roc_auc = multi_class_roc_auc(targets.copy(), preds.copy(), labels) | |
print( | |
f"Epoch={epoch}, Valid ROC AUC={roc_auc}" | |
) | |
save_checkpoint(model, optimizer, f'./model_{epoch}.pth') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment