Skip to content

Instantly share code, notes, and snippets.

@Antsthebul
Last active April 8, 2023 13:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Antsthebul/f1d970cd19a965ac58d39572c9f370d8 to your computer and use it in GitHub Desktop.
Save Antsthebul/f1d970cd19a965ac58d39572c9f370d8 to your computer and use it in GitHub Desktop.
Simple Distrbuted CNN
import csv
import datetime
import os
import time
import pandas as pd
import albumentations as A
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
import torch
import torchmetrics
from albumentations.pytorch import ToTensorV2
from config import *
from light_dataset import LightDataset
from trainer import Trainer
from light_model import TrafficCNN
from torch.utils.data import DataLoader
# DDP
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from utils import X_test, X_train, y_test, y_train
NAME = "CNN_"+ str(int(time.mktime(datetime.datetime.now().timetuple())))
OPTIMIZER_KWARGS = {
'lr':.0001,
'momentum':0.8,
'weight_decay':1e-4
}
IMAGE_HEIGHT = 72
IMAGE_WIDTH = 72
train_tfms = A.Compose([A.Resize(IMAGE_HEIGHT, IMAGE_WIDTH),
# A.augmentations.crops.RandomResizedCrop(IMAGE_HEIGHT,IMAGE_WIDTH, scale=(0.35, 1.0)),
ToTensorV2() ])
test_tfms = A.Compose([A.Resize(IMAGE_HEIGHT, IMAGE_WIDTH) ,ToTensorV2() ])
def load_train_objs():
"""Return loss func, model, and optimizer"""
train_set = LightDataset(X_train, y_train, transform=train_tfms)
test_set = LightDataset(X_test, y_test, transform=test_tfms)
metric = torchmetrics.Accuracy(task="multiclass", num_classes=NUM_CLASSES)
model = TrafficCNN()
model.metric = metric
optimizer = torch.optim.SGD(model.parameters(),**OPTIMIZER_KWARGS)
return train_set, test_set, model, optimizer, metric
def prepare_dataloader(dataset, batch_size: int, num_workers):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False,
sampler=DistributedSampler(dataset)
)
def ddp_setup(rank, world_size):
"""
Args:
rank: Unique identifier of each process
world_size: Total number of processes
"""
dist.init_process_group(backend="nccl", rank=rank, world_size=world_size)
torch.manual_seed(1)
def example(rank, world_size, file_name):
"""
:rank is passed in to func by mp.spawn"""
# create default process group
ddp_setup(rank, world_size)
dataset, test_set, model, optimizer, metric= load_train_objs()
train_data = prepare_dataloader(dataset, BATCH_SIZE, world_size)
validation_data = prepare_dataloader(test_set, BATCH_SIZE, world_size)
# create local model
val_loss = torchmetrics.MeanMetric().to(rank)
train_acc = torchmetrics.Accuracy(task="multiclass", num_classes=NUM_CLASSES).to(rank)
train_loss = torchmetrics.MeanMetric().to(rank)
trainer = Trainer(model, metric, val_loss, train_acc, train_loss, train_data, validation_data, optimizer, rank, 10, file_name)
# construct DDP model
trainer.train(EPOCHS)
dist.destroy_process_group()
if __name__== "__main__":
print(f"FileName: {NAME}")
header = f"Training {EPOCHS} epochs. Batch Size {BATCH_SIZE}"
print(header)
try:
os.mkdir("runs")
except:
pass
os.mkdir(f"runs/{NAME}")
with open(f"runs/{NAME}/{NAME}.tXt", 'w') as f:
f.write(header+"\n")
for k, v in OPTIMIZER_KWARGS.items():
line =f"{k}:{v} "
f.write(line)
f.write("\n")
with open(f"runs/{NAME}/{NAME}.csv", 'w', newline="") as f:
writer = csv.writer(f)
writer.writerow(["Epoch", "TRAINING_ACC", "VALIDATION_ACC", "TRAINING_LOSS", "VALIDATION_LOSS"])
world_size = torch.cuda.device_count()
print("Device Count => ", world_size )
start = time.perf_counter()
mp.spawn(example,
args=(world_size,NAME),
nprocs=world_size,
join=True)
total = time.perf_counter() - start
end_statement = f"Trainig Complete {total:.2f} sec"
print(end_statement)
with open(f"runs/{NAME}/{NAME}.tXt", 'a') as f:
f.write(end_statement+"\n")
file_name = Path("runs",NAME, f"{NAME}.csv")
df = pd.read_csv(file_name)
os.remove(file_name)
df[::2].to_csv(file_name, index=False)
import cv2
from torch.utils.data.dataset import Dataset
from config import *
class LightDataset(Dataset):
def __init__(self, image_data, labels, transform=None):
self.image_paths = image_data.ravel()
self.y_labels = labels.ravel()
self.transform = transform
def __getitem__(self, index):
img_path = self.image_paths[index]
image = cv2.imread(img_path)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
y_label = self.y_labels[index]
if self.transform:
image = self.transform(image=image)['image'] # albumentation trnfrms, needs to access image key
return image, y_label
def __len__(self):
return len(self.image_paths)
import torch
import torch.nn as nn
from config import *
def conv_block(in_channel, out_channel, kernel=5):
return nn.Sequential(
nn.Conv2d(in_channels=in_channel, out_channels=out_channel, kernel_size=kernel, padding="same", bias=False),
nn.ReLU() ,
nn.MaxPool2d(kernel_size=2, stride=1),
nn.BatchNorm2d(out_channel)
).to(DEVICE)
class TrafficCNN(nn.Module):
def __init__(self):
super().__init__()
self.conv_layer1 = conv_block(3, CONV_1_OUTPUT)
self.conv_layer2 = conv_block(CONV_1_OUTPUT, CONV_2_OUTPUT, 3)
self.conv_layer3 = conv_block(CONV_2_OUTPUT, CONV_2_OUTPUT*2, 3)
self.conv_layer4 = conv_block(CONV_2_OUTPUT*2, CONV_2_OUTPUT*4)
self.conv_layer5 = conv_block(CONV_2_OUTPUT*4, CONV_2_OUTPUT*6)
# Linear layer
self.flatten = nn.Flatten()
self.fc_layer = nn.Sequential(
nn.Linear(6589824, 1024),
nn.ReLU(),
nn.BatchNorm2d(1024),
nn.Linear(1024, NUM_CLASSES)
)
def forward(self, x):
x = self.conv_layer1(x)
x = self.conv_layer2(x)
x = self.conv_layer3(x)
x = self.conv_layer4(x)
x = self.conv_layer5(x)
x = self.flatten(x)
return self.fc_layer(x)
import csv
import torchmetrics
import torch
import torch.nn as nn
from torchinfo import summary
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from config import *
class Trainer:
def __init__(
self,
model: torch.nn.Module,
metric,
val_loss,
train_acc,
train_loss,
train_data: DataLoader,
val_data :DataLoader,
optimizer: torch.optim.Optimizer,
gpu_id: int,
save_every: int,
file_name
) -> None:
self.gpu_id = gpu_id
self.model = model.to(gpu_id)
self.train_data = train_data
self.valid_data = val_data
self.optimizer = optimizer
self.save_every = save_every
self.model = DDP(model, device_ids=[gpu_id])
# metrics
self.metric = metric
self.val_loss = val_loss
self.train_acc = train_acc
self.train_loss = train_loss
self.file_name = file_name
def _run_batch(self, ix, source, targets):
self.model.train()
self.optimizer.zero_grad(set_to_none=True)
scaler = torch.cuda.amp.GradScaler()
# forward pass with mixed precision
with torch.cuda.amp.autocast():
output = self.model(source)
self.train_acc.update(output, targets)
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(output, targets)
self.train_loss.update(loss)
scaler.scale(loss).backward()
if (ix+1) % 2 == 0 or (ix+1)== len(self.train_data):
scaler.step(self.optimizer)
scaler.update()
self.optimizer.zero_grad(set_to_none=True)
def _run_validation(self, ix, source, targets):
self.model.eval()
with torch.no_grad():
# forward pass with mixed precision
with torch.cuda.amp.autocast():
output = self.model(source)
self.metric(output, targets)
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(output, targets)
self.val_loss.update(loss)
def _run_epoch(self, epoch):
torch.backends.cudnn.benchmark = True
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
self.train_data.sampler.set_epoch(epoch)
for ix, (source, targets) in enumerate(self.train_data):
source = source.to(self.gpu_id, non_blocking=True, memory_format=torch.channels_last)
targets = targets.to(self.gpu_id, non_blocking=True)
self._run_batch(ix, source, targets)
for ix, (source, targets) in enumerate(self.valid_data):
source = source.to(self.gpu_id, non_blocking=True, memory_format=torch.channels_last)
targets = targets.to(self.gpu_id, non_blocking=True)
self._run_validation(ix, source, targets)
del source
del targets
acc = self.metric.compute() # Train acc
val_loss = self.val_loss.compute().item()
train_loss = self.train_loss.compute().item()
train_acc = self.train_acc.compute().item() * 100
self.train_loss.reset()
print(f"Epoch {epoch}: Validation Acc => {acc*100:.2f}% Validation loss {val_loss}")
with open(f"runs/{self.file_name}/{self.file_name}.csv",'a', newline="") as f:
writer = csv.writer(f)
writer.writerow([epoch,train_acc, acc.item()*100, train_loss, val_loss])
self.metric.reset()
self.val_loss.reset()
self.train_acc.reset()
def _save_checkpoint(self, epoch):
PATH = BASE_PATH+'/mtg_classifier.pth'
torch.save(self.model.module.state_dict(), PATH)
print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")
def train(self, epochs):
for epoch in range(epochs):
self._run_epoch(epoch)
if self.gpu_id == 0 and epoch % self.save_every == 0:
self._save_checkpoint(epoch)
# if epoch == epochs-1:
# print(summary(self.model))
# with open(f"runs/{self.file_name}_model.txt","w") as f:
# f.write(summary(self.model))
import os
from pathlib import Path
import pandas as pd
from config import *
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
# build Dataframe
data_frame = []
for light in os.listdir(DATA_PATH):
for name in os.listdir(Path(DATA_PATH, light)):
data_frame.append([str(Path(DATA_PATH,light,name)), light])
df = pd.DataFrame(data_frame)
le = LabelEncoder()
df[1] = le.fit_transform(df[1])
# We need to create a train and valid set from each category manually
# Create stratified Dataframes to pass to Dataset
X = df.iloc[:, :-1].values
y = df.iloc[:, -1].values
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, random_state=42)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment