-
-
Save Antsthebul/f1d970cd19a965ac58d39572c9f370d8 to your computer and use it in GitHub Desktop.
Simple Distrbuted CNN
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import datetime | |
import os | |
import time | |
import pandas as pd | |
import albumentations as A | |
from pathlib import Path | |
from dotenv import load_dotenv | |
load_dotenv() | |
import torch | |
import torchmetrics | |
from albumentations.pytorch import ToTensorV2 | |
from config import * | |
from light_dataset import LightDataset | |
from trainer import Trainer | |
from light_model import TrafficCNN | |
from torch.utils.data import DataLoader | |
# DDP | |
import torch.distributed as dist | |
import torch.multiprocessing as mp | |
from torch.utils.data.distributed import DistributedSampler | |
from utils import X_test, X_train, y_test, y_train | |
NAME = "CNN_"+ str(int(time.mktime(datetime.datetime.now().timetuple()))) | |
OPTIMIZER_KWARGS = { | |
'lr':.0001, | |
'momentum':0.8, | |
'weight_decay':1e-4 | |
} | |
IMAGE_HEIGHT = 72 | |
IMAGE_WIDTH = 72 | |
train_tfms = A.Compose([A.Resize(IMAGE_HEIGHT, IMAGE_WIDTH), | |
# A.augmentations.crops.RandomResizedCrop(IMAGE_HEIGHT,IMAGE_WIDTH, scale=(0.35, 1.0)), | |
ToTensorV2() ]) | |
test_tfms = A.Compose([A.Resize(IMAGE_HEIGHT, IMAGE_WIDTH) ,ToTensorV2() ]) | |
def load_train_objs(): | |
"""Return loss func, model, and optimizer""" | |
train_set = LightDataset(X_train, y_train, transform=train_tfms) | |
test_set = LightDataset(X_test, y_test, transform=test_tfms) | |
metric = torchmetrics.Accuracy(task="multiclass", num_classes=NUM_CLASSES) | |
model = TrafficCNN() | |
model.metric = metric | |
optimizer = torch.optim.SGD(model.parameters(),**OPTIMIZER_KWARGS) | |
return train_set, test_set, model, optimizer, metric | |
def prepare_dataloader(dataset, batch_size: int, num_workers): | |
return DataLoader( | |
dataset, | |
batch_size=batch_size, | |
pin_memory=True, | |
shuffle=False, | |
sampler=DistributedSampler(dataset) | |
) | |
def ddp_setup(rank, world_size): | |
""" | |
Args: | |
rank: Unique identifier of each process | |
world_size: Total number of processes | |
""" | |
dist.init_process_group(backend="nccl", rank=rank, world_size=world_size) | |
torch.manual_seed(1) | |
def example(rank, world_size, file_name): | |
""" | |
:rank is passed in to func by mp.spawn""" | |
# create default process group | |
ddp_setup(rank, world_size) | |
dataset, test_set, model, optimizer, metric= load_train_objs() | |
train_data = prepare_dataloader(dataset, BATCH_SIZE, world_size) | |
validation_data = prepare_dataloader(test_set, BATCH_SIZE, world_size) | |
# create local model | |
val_loss = torchmetrics.MeanMetric().to(rank) | |
train_acc = torchmetrics.Accuracy(task="multiclass", num_classes=NUM_CLASSES).to(rank) | |
train_loss = torchmetrics.MeanMetric().to(rank) | |
trainer = Trainer(model, metric, val_loss, train_acc, train_loss, train_data, validation_data, optimizer, rank, 10, file_name) | |
# construct DDP model | |
trainer.train(EPOCHS) | |
dist.destroy_process_group() | |
if __name__== "__main__": | |
print(f"FileName: {NAME}") | |
header = f"Training {EPOCHS} epochs. Batch Size {BATCH_SIZE}" | |
print(header) | |
try: | |
os.mkdir("runs") | |
except: | |
pass | |
os.mkdir(f"runs/{NAME}") | |
with open(f"runs/{NAME}/{NAME}.tXt", 'w') as f: | |
f.write(header+"\n") | |
for k, v in OPTIMIZER_KWARGS.items(): | |
line =f"{k}:{v} " | |
f.write(line) | |
f.write("\n") | |
with open(f"runs/{NAME}/{NAME}.csv", 'w', newline="") as f: | |
writer = csv.writer(f) | |
writer.writerow(["Epoch", "TRAINING_ACC", "VALIDATION_ACC", "TRAINING_LOSS", "VALIDATION_LOSS"]) | |
world_size = torch.cuda.device_count() | |
print("Device Count => ", world_size ) | |
start = time.perf_counter() | |
mp.spawn(example, | |
args=(world_size,NAME), | |
nprocs=world_size, | |
join=True) | |
total = time.perf_counter() - start | |
end_statement = f"Trainig Complete {total:.2f} sec" | |
print(end_statement) | |
with open(f"runs/{NAME}/{NAME}.tXt", 'a') as f: | |
f.write(end_statement+"\n") | |
file_name = Path("runs",NAME, f"{NAME}.csv") | |
df = pd.read_csv(file_name) | |
os.remove(file_name) | |
df[::2].to_csv(file_name, index=False) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
from torch.utils.data.dataset import Dataset | |
from config import * | |
class LightDataset(Dataset): | |
def __init__(self, image_data, labels, transform=None): | |
self.image_paths = image_data.ravel() | |
self.y_labels = labels.ravel() | |
self.transform = transform | |
def __getitem__(self, index): | |
img_path = self.image_paths[index] | |
image = cv2.imread(img_path) | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
y_label = self.y_labels[index] | |
if self.transform: | |
image = self.transform(image=image)['image'] # albumentation trnfrms, needs to access image key | |
return image, y_label | |
def __len__(self): | |
return len(self.image_paths) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import torch | |
import torch.nn as nn | |
from config import * | |
def conv_block(in_channel, out_channel, kernel=5): | |
return nn.Sequential( | |
nn.Conv2d(in_channels=in_channel, out_channels=out_channel, kernel_size=kernel, padding="same", bias=False), | |
nn.ReLU() , | |
nn.MaxPool2d(kernel_size=2, stride=1), | |
nn.BatchNorm2d(out_channel) | |
).to(DEVICE) | |
class TrafficCNN(nn.Module): | |
def __init__(self): | |
super().__init__() | |
self.conv_layer1 = conv_block(3, CONV_1_OUTPUT) | |
self.conv_layer2 = conv_block(CONV_1_OUTPUT, CONV_2_OUTPUT, 3) | |
self.conv_layer3 = conv_block(CONV_2_OUTPUT, CONV_2_OUTPUT*2, 3) | |
self.conv_layer4 = conv_block(CONV_2_OUTPUT*2, CONV_2_OUTPUT*4) | |
self.conv_layer5 = conv_block(CONV_2_OUTPUT*4, CONV_2_OUTPUT*6) | |
# Linear layer | |
self.flatten = nn.Flatten() | |
self.fc_layer = nn.Sequential( | |
nn.Linear(6589824, 1024), | |
nn.ReLU(), | |
nn.BatchNorm2d(1024), | |
nn.Linear(1024, NUM_CLASSES) | |
) | |
def forward(self, x): | |
x = self.conv_layer1(x) | |
x = self.conv_layer2(x) | |
x = self.conv_layer3(x) | |
x = self.conv_layer4(x) | |
x = self.conv_layer5(x) | |
x = self.flatten(x) | |
return self.fc_layer(x) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import torchmetrics | |
import torch | |
import torch.nn as nn | |
from torchinfo import summary | |
from torch.nn.parallel import DistributedDataParallel as DDP | |
from torch.utils.data import DataLoader | |
from config import * | |
class Trainer: | |
def __init__( | |
self, | |
model: torch.nn.Module, | |
metric, | |
val_loss, | |
train_acc, | |
train_loss, | |
train_data: DataLoader, | |
val_data :DataLoader, | |
optimizer: torch.optim.Optimizer, | |
gpu_id: int, | |
save_every: int, | |
file_name | |
) -> None: | |
self.gpu_id = gpu_id | |
self.model = model.to(gpu_id) | |
self.train_data = train_data | |
self.valid_data = val_data | |
self.optimizer = optimizer | |
self.save_every = save_every | |
self.model = DDP(model, device_ids=[gpu_id]) | |
# metrics | |
self.metric = metric | |
self.val_loss = val_loss | |
self.train_acc = train_acc | |
self.train_loss = train_loss | |
self.file_name = file_name | |
def _run_batch(self, ix, source, targets): | |
self.model.train() | |
self.optimizer.zero_grad(set_to_none=True) | |
scaler = torch.cuda.amp.GradScaler() | |
# forward pass with mixed precision | |
with torch.cuda.amp.autocast(): | |
output = self.model(source) | |
self.train_acc.update(output, targets) | |
loss_fn = nn.CrossEntropyLoss() | |
loss = loss_fn(output, targets) | |
self.train_loss.update(loss) | |
scaler.scale(loss).backward() | |
if (ix+1) % 2 == 0 or (ix+1)== len(self.train_data): | |
scaler.step(self.optimizer) | |
scaler.update() | |
self.optimizer.zero_grad(set_to_none=True) | |
def _run_validation(self, ix, source, targets): | |
self.model.eval() | |
with torch.no_grad(): | |
# forward pass with mixed precision | |
with torch.cuda.amp.autocast(): | |
output = self.model(source) | |
self.metric(output, targets) | |
loss_fn = nn.CrossEntropyLoss() | |
loss = loss_fn(output, targets) | |
self.val_loss.update(loss) | |
def _run_epoch(self, epoch): | |
torch.backends.cudnn.benchmark = True | |
b_sz = len(next(iter(self.train_data))[0]) | |
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}") | |
self.train_data.sampler.set_epoch(epoch) | |
for ix, (source, targets) in enumerate(self.train_data): | |
source = source.to(self.gpu_id, non_blocking=True, memory_format=torch.channels_last) | |
targets = targets.to(self.gpu_id, non_blocking=True) | |
self._run_batch(ix, source, targets) | |
for ix, (source, targets) in enumerate(self.valid_data): | |
source = source.to(self.gpu_id, non_blocking=True, memory_format=torch.channels_last) | |
targets = targets.to(self.gpu_id, non_blocking=True) | |
self._run_validation(ix, source, targets) | |
del source | |
del targets | |
acc = self.metric.compute() # Train acc | |
val_loss = self.val_loss.compute().item() | |
train_loss = self.train_loss.compute().item() | |
train_acc = self.train_acc.compute().item() * 100 | |
self.train_loss.reset() | |
print(f"Epoch {epoch}: Validation Acc => {acc*100:.2f}% Validation loss {val_loss}") | |
with open(f"runs/{self.file_name}/{self.file_name}.csv",'a', newline="") as f: | |
writer = csv.writer(f) | |
writer.writerow([epoch,train_acc, acc.item()*100, train_loss, val_loss]) | |
self.metric.reset() | |
self.val_loss.reset() | |
self.train_acc.reset() | |
def _save_checkpoint(self, epoch): | |
PATH = BASE_PATH+'/mtg_classifier.pth' | |
torch.save(self.model.module.state_dict(), PATH) | |
print(f"Epoch {epoch} | Training checkpoint saved at {PATH}") | |
def train(self, epochs): | |
for epoch in range(epochs): | |
self._run_epoch(epoch) | |
if self.gpu_id == 0 and epoch % self.save_every == 0: | |
self._save_checkpoint(epoch) | |
# if epoch == epochs-1: | |
# print(summary(self.model)) | |
# with open(f"runs/{self.file_name}_model.txt","w") as f: | |
# f.write(summary(self.model)) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from pathlib import Path | |
import pandas as pd | |
from config import * | |
from sklearn.preprocessing import LabelEncoder | |
from sklearn.model_selection import train_test_split | |
# build Dataframe | |
data_frame = [] | |
for light in os.listdir(DATA_PATH): | |
for name in os.listdir(Path(DATA_PATH, light)): | |
data_frame.append([str(Path(DATA_PATH,light,name)), light]) | |
df = pd.DataFrame(data_frame) | |
le = LabelEncoder() | |
df[1] = le.fit_transform(df[1]) | |
# We need to create a train and valid set from each category manually | |
# Create stratified Dataframes to pass to Dataset | |
X = df.iloc[:, :-1].values | |
y = df.iloc[:, -1].values | |
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, random_state=42) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment