Skip to content

Instantly share code, notes, and snippets.

@smothiki
Created January 27, 2024 21:45
Show Gist options
  • Save smothiki/2ee9ba977d8109b0b7d069c754cdd2cb to your computer and use it in GitHub Desktop.
Save smothiki/2ee9ba977d8109b0b7d069c754cdd2cb to your computer and use it in GitHub Desktop.
!pip3 install torch torchvision pandas numpy onnx onnxruntime
import argparse
import torch
import torch.nn.functional as F
from torch import nn, optim
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms
import mlflow
import mlflow.pytorch
import onnx
import mlflow.onnx
class Net(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
def train(model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 10 == 0:
print(
"Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch,
batch_idx * len(data),
len(train_loader.dataset),
100.0 * batch_idx / len(train_loader),
loss.item(),
)
)
def main():
batch_size=64
seed=1
test_batch_size=100
epochs=1
lr=0.1
gamma=0.7
no_cuda=True # make it false for non gpu workloads
dry_run=False
log_interval=10
use_cuda = not no_cuda and torch.cuda.is_available()
torch.manual_seed(seed)
device = torch.device("cuda" if use_cuda else "cpu")
train_kwargs = {"batch_size": batch_size}
test_kwargs = {"batch_size": test_batch_size}
if use_cuda:
cuda_kwargs = {"num_workers": 1, "pin_memory": True, "shuffle": True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
dataset1 = datasets.MNIST("./data", train=True, download=True, transform=transform)
dataset2 = datasets.MNIST("./data", train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
model = Net().to(device)
scripted_model = torch.jit.script(model) # scripting the model
optimizer = optim.Adadelta(model.parameters(), lr=lr)
scheduler = StepLR(optimizer, step_size=1, gamma=gamma)
for epoch in range(1, epochs + 1):
train(scripted_model, device, train_loader, optimizer, epoch)
scheduler.step()
with mlflow.start_run():
f = io.BytesIO()
torch.onnx.export(model, data, f)
mlflow.pytorch.log_model(scripted_model, "model") # logging scripted model
model_path = mlflow.get_artifact_uri("model")
loaded_pytorch_model = mlflow.pytorch.load_model(model_path) # loading scripted model
model.eval()
with torch.no_grad():
test_datapoint, test_target = next(iter(test_loader))
prediction = loaded_pytorch_model(test_datapoint[0].unsqueeze(0).to(device))
actual = test_target[0].item()
predicted = torch.argmax(prediction).item()
print(f"\nPREDICTION RESULT: ACTUAL: {actual!s}, PREDICTED: {predicted!s}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment