Skip to content

Instantly share code, notes, and snippets.

@PM25
Last active December 12, 2020 09:09
Show Gist options
  • Save PM25/034a3f7be03389f97c102f281aaed303 to your computer and use it in GitHub Desktop.
Save PM25/034a3f7be03389f97c102f281aaed303 to your computer and use it in GitHub Desktop.
PyTorch
# %%
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn.utils.prune as prune
from torch.utils.data import random_split
from pathlib import Path
from datetime import datetime
#%%
# early stopping
class EarlyStopping:
def __init__(self, patience=5, delta=0, verbose=True):
self.patience = patience
self.verbose = verbose
self.best_score = None
self.delta = delta
self.counter = 0
self.early_stop = False
self.checkpoint = Checkpoint("model")
def __call__(self, model, val_loss, optimizer, epoch=None):
score = -val_loss
if self.best_score == None or score > self.best_score:
self.counter = 0
self.best_score = score
self.checkpoint.save(model, optimizer, val_loss)
elif score < self.best_score + self.delta:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
else:
self.counter = 0
# check if the early stopping criteria is meet
def early_stop(self):
return self.early_stop
# return class checkpoint (for loading & saving model)
def checkpoint(self):
return self.checkpoint
# save & load model for later train
class Checkpoint:
def __init__(self, base_folder="model"):
now = datetime.now().strftime("%m-%d-%y_%H.%M.%S")
self.save_folder = Path(base_folder) / Path(now)
self.last_save = None
# save checkpoint model
def save(self, model, optimizer, loss=None, epoch=None):
save_path = self.save_folder / Path(f"loss_{loss:.3f}.pt")
save_path.parent.mkdir(parents=True, exist_ok=True)
print(f"*Saving Model Checkpoint: {save_path}")
# check if model is on DataParallel format
if isinstance(model, nn.DataParallel):
state_dict = model.module.state_dict()
else:
state_dict = model.state_dict()
# save model
torch.save(
{
"epoch": epoch,
"model_state_dict": state_dict,
"optimizer_state_dict": optimizer.state_dict(),
"loss": loss,
},
save_path,
)
self.last_save = save_path
# load checkpoint model
def load(self, model, optimizer, fname=None):
if fname != None:
checkpoint = torch.load(self.save_folder / Path(fname))
else:
checkpoint = torch.load(self.last_save)
# check if model is on DataParallel format
if isinstance(model, nn.DataParallel):
model.module.load_state_dict(checkpoint["model_state_dict"])
else:
model.load_state_dict(checkpoint["model_state_dict"])
optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
epoch = checkpoint["epoch"]
loss = checkpoint["loss"]
return {"model": model, "optimizer": optimizer, "epoch": epoch, "loss": loss}
# %%
gpu = True if torch.cuda.is_available() else False
if gpu:
print(f"*Using GPU: {torch.cuda.get_device_name(0)}")
device = torch.device("cuda:0")
else:
print("*Using CPU")
device = torch.device("cpu")
# %%
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
# %%
# train & validation dataset
dataset = datasets.CIFAR10(
root="./data", train=True, download=True, transform=transform
)
trainset_count = int(len(dataset) * 0.8)
valset_count = len(dataset) - trainset_count
trainset, valset = random_split(dataset, [trainset_count, valset_count])
train_loader = torch.utils.data.DataLoader(
trainset, batch_size=128, shuffle=True, num_workers=0
)
val_loader = torch.utils.data.DataLoader(
valset, batch_size=128, shuffle=True, num_workers=0
)
# test dataset
testset = datasets.CIFAR10(
root="./data", train=False, download=True, transform=transform
)
test_loader = torch.utils.data.DataLoader(
testset, batch_size=128, shuffle=False, num_workers=0
)
# %%
# Cifar-10's classes
classes = (
"plane",
"car",
"bird",
"cat",
"deer",
"dog",
"frog",
"horse",
"ship",
"truck",
)
# model = models.resnet50(pretrained=True)
class Model(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 8, 3)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(8, 16, 3)
self.fc1 = nn.Linear(16 * 6 * 6, 120)
self.fc2 = nn.Linear(120, 64)
self.fc3 = nn.Linear(64, len(classes))
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# %% validation
def validation(device, model, val_loader):
running_loss = 0.0
with torch.no_grad():
for data in val_loader:
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = loss_func(outputs, labels)
running_loss += loss.item()
return running_loss / len(val_loader)
# %% evaluate
def evaluate(device, model, test_loader):
model.eval().to(device)
correct = 0
total = 0
with torch.no_grad():
for data in test_loader:
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print(
f"Accuracy of the network on the {len(test_loader)} test inputs: {(100 * correct / total)} %"
)
class_correct = list(0.0 for i in range(len(classes)))
class_total = list(0.0 for i in range(len(classes)))
with torch.no_grad():
for data in test_loader:
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, predicted = torch.max(outputs, 1)
c = (predicted == labels).squeeze()
for i in range(len(labels)):
label = labels[i]
class_correct[label] += c[i].item()
class_total[label] += 1
for i in range(len(classes)):
print(
f"Accuracy of {classes[i]: >5} : {100 * class_correct[i] / class_total[i]:.0f} %"
)
# %% training
def train(
device, model, train_loader, epochs, loss_func, optimizer, multi_gpus=True, log=100
):
# model setup
model.train().to(device)
if multi_gpus and torch.cuda.device_count() > 1:
print(f"*Using {torch.cuda.device_count()} GPUs!")
model = nn.DataParallel(model)
# early stopping instance
early_stopping = EarlyStopping(patience=5)
# training start!
for epoch in range(1, epochs + 1):
running_loss = 0.0
for step, data in enumerate(train_loader, start=1):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# Zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = model(inputs)
loss = loss_func(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
if step % 100 == 0 or step == len(train_loader):
print(
f"[{epoch}/{epochs}, {step}/{len(train_loader)}] loss: {running_loss / step :.3f}"
)
# train & validation loss
train_loss = running_loss / len(train_loader)
val_loss = validation(device, model, val_loader)
print(f"train loss: {train_loss:.3f}, val loss: {val_loss:.3f}")
early_stopping(model, val_loss, optimizer)
if early_stopping.early_stop:
print("*Early Stopping.")
break
print("*Finished Training!")
return early_stopping.checkpoint
# %% start from here!
if __name__ == "__main__":
# init model
model = Model()
# setting
epochs = 30
loss_func = nn.CrossEntropyLoss()
lr = 0.001
optimizer = optim.Adam(model.parameters(), lr=lr)
# training result (use checkpoint class to load best model)
checkpoint = train(device, model, train_loader, epochs, loss_func, optimizer)
null_model = Model().to(device)
null_optimizer = optim.Adam(null_model.parameters(), lr=lr)
checkpoint_data = checkpoint.load(null_model, null_optimizer)
# evaluate the model
model = checkpoint_data["model"]
evaluate(device, model, test_loader)
# %%
# %%
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
# %%
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# %%
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
# %%
trainset = datasets.CIFAR10(
root="./data", train=True, download=True, transform=transform
)
train_loader = torch.utils.data.DataLoader(
trainset, batch_size=32, shuffle=True, num_workers=0
)
testset = datasets.CIFAR10(
root="./data", train=False, download=True, transform=transform
)
test_loader = torch.utils.data.DataLoader(
testset, batch_size=32, shuffle=False, num_workers=0
)
# %%
# Cifar-10's classes
classes = (
"plane",
"car",
"bird",
"cat",
"deer",
"dog",
"frog",
"horse",
"ship",
"truck",
)
model = models.resnet18()
# %% Validation
def validation(device, model, val_loader):
model.eval().to(device)
running_loss = 0.0
with torch.no_grad():
for data in val_loader:
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = loss_func(outputs, labels)
running_loss += loss.item()
return running_loss / len(val_loader)
# %% Train
def train(device, model, train_loader, epochs, loss_func, optimizer, log=100):
model.train().to(device)
for epoch in range(1, epochs + 1):
running_loss = 0.0
for step, data in enumerate(train_loader, start=1):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# Zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = model(inputs)
loss = loss_func(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
if step % 100 == 0 or step == len(train_loader):
print(
f"[{epoch}/{epochs}, {step}/{len(train_loader)}] loss: {running_loss / step :.3f}"
)
train_loss = running_loss / len(train_loader)
val_loss = validation(device, model, test_loader)
print(f"train loss: {train_loss:.3f}, val loss: {val_loss:.3f}")
print("*Finished Training!")
return model
# %% Test
def evaluate(device, model, test_loader):
model.eval().to(device)
correct = 0
total = 0
with torch.no_grad():
for data in test_loader:
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print(
f"Accuracy of the network on the {len(test_loader)} test inputs: {(100 * correct / total)} %"
)
class_correct = list(0.0 for i in range(len(classes)))
class_total = list(0.0 for i in range(len(classes)))
with torch.no_grad():
for data in test_loader:
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, predicted = torch.max(outputs, 1)
c = (predicted == labels).squeeze()
for i in range(len(labels)):
label = labels[i]
class_correct[label] += c[i].item()
class_total[label] += 1
for i in range(len(classes)):
print(
f"Accuracy of {classes[i]: >5} : {100 * class_correct[i] / class_total[i]:.0f} %"
)
# %%
if __name__ == "__main__":
loss_func = nn.CrossEntropyLoss()
lr = 0.001
EPOCHS = 5
optimizer = optim.Adam(model.parameters(), lr=lr)
model = train(device, model, train_loader, EPOCHS, loss_func, optimizer)
evaluate(device, model, test_loader)
import torch
import torch.nn as nn
from pathlib.Path as Path
from datetime import datetime
# early stopping
class EarlyStopping:
def __init__(self, patience=5, delta=0, verbose=True):
self.patience = patience
self.verbose = verbose
self.best_score = None
self.delta = delta
self.counter = 0
self.early_stop = False
self.checkpoint = Checkpoint("model")
def __call__(self, model, val_loss, optimizer, epoch=None):
score = -val_loss
if self.best_score == None or score > self.best_score:
self.counter = 0
self.best_score = score
self.checkpoint.save(model, optimizer, val_loss)
elif score < self.best_score + self.delta:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
else:
self.counter = 0
# check if the early stopping criteria is meet
def early_stop(self):
return self.early_stop
# return class checkpoint (for loading & saving model)
def checkpoint(self):
return self.checkpoint
# save & load model for later train
class Checkpoint:
def __init__(self, base_folder="model"):
now = datetime.now().strftime("%m-%d-%y_%H.%M.%S")
self.save_folder = Path(base_folder) / Path(now)
self.last_save = None
# save checkpoint model
def save(self, model, optimizer, loss=None, epoch=None):
save_path = self.save_folder / Path(f"loss_{loss:.3f}.pt")
save_path.parent.mkdir(parents=True, exist_ok=True)
print(f"*Saving Model Checkpoint: {save_path}")
# check if model is on DataParallel format
if isinstance(model, nn.DataParallel):
state_dict = model.module.state_dict()
else:
state_dict = model.state_dict()
# save model
torch.save(
{
"epoch": epoch,
"model_state_dict": state_dict,
"optimizer_state_dict": optimizer.state_dict(),
"loss": loss,
},
save_path,
)
self.last_save = save_path
# load checkpoint model
def load(self, model, optimizer, fname=None):
if fname != None:
checkpoint = torch.load(self.save_folder / Path(fname))
else:
checkpoint = torch.load(self.last_save)
# check if model is on DataParallel format
if isinstance(model, nn.DataParallel):
model.module.load_state_dict(checkpoint["model_state_dict"])
else:
model.load_state_dict(checkpoint["model_state_dict"])
optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
epoch = checkpoint["epoch"]
loss = checkpoint["loss"]
return {"model": model, "optimizer": optimizer, "epoch": epoch, "loss": loss}
# start from here! (Demonstrate Class Usage)
if __name__ == "__main__":
# training model
early_stopping = EarlyStopping(patience=5, verbose=True)
for epoch in range(1, epochs + 1):
running_loss = 0.0
for step, data in enumerate(train_loader, start=1):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# Zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = model(inputs)
loss = loss_func(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
if step % 100 == 0 or step == len(train_loader):
print(
f"[{epoch}/{epochs}, {step}/{len(train_loader)}] loss: {running_loss / step :.3f}"
)
train_loss = running_loss / len(train_loader)
val_loss = validation(device, model, test_loader)
print(f"train loss: {train_loss:.3f}, val loss: {val_loss:.3f}")
early_stopping(model, val_loss, optimizer)
if(early_stopping.early_stop()):
print("*Early Stopping.")
break
print("*Finished Training!")
checkpoint = early_stopping.checkpoint
null_model = Model().to(device)
null_optimizer = optim.Adam(null_model.parameters(), lr=lr)
checkpoint_data = checkpoint.load(null_model, null_optimizer)
# evaluate the model
model = checkpoint_data["model"]
evaluate(device, model, test_loader)
if torch.cuda.device_count() > 1:
print("Let's use", torch.cuda.device_count(), "GPUs!")
model = nn.DataParallel(model)
import torch.nn as nn
import torch.nn.functional as F
class Model(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5) # or x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# show tensor fomat image
import matplotlib.pyplot as plt
import numpy as np
def imshow(img):
img = img / 2 + 0.5 # unnormalize (original transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)))
npimg = img.numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
# get some random training images
dataiter = iter(train_loader)
images, labels = dataiter.next()
# show images
imshow(torchvision.utils.make_grid(images))
import random
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter()
# add_scalar
for epoch in range(100):
loss, acc = random.random()
writer.add_scalar("Loss/train", loss, epoch)
writer.add_scalar("Loss/test", loss, epoch)
writer.add_scalar("Accuracy/train", acc, epoch)
writer.add_scalar("Accuracy/test", acc, epoch)
writer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment