Skip to content

Instantly share code, notes, and snippets.

@kastnerkyle
Last active January 26, 2017 22:31
Show Gist options
  • Save kastnerkyle/af458f908d572546a2baeadc1e70c554 to your computer and use it in GitHub Desktop.
Save kastnerkyle/af458f908d572546a2baeadc1e70c554 to your computer and use it in GitHub Desktop.
MNIST test in PyTorch, performance still TBD
# Author: Kyle Kastner
# License: BSD 3-Clause
import torch as th
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import time
import numpy as np
# Modified from https://github.com/sorki/python-mnist
# License: BSD 3-Clause
def load_mnist(which_set, data_path="data/mnist"):
if data_path[-1] != os.sep:
data_path = data_path + os.sep
train_image_file = data_path + "train-images-idx3-ubyte"
train_label_file = data_path + "train-labels-idx1-ubyte"
test_image_file = data_path + "t10k-images-idx3-ubyte"
test_label_file = data_path + "t10k-labels-idx1-ubyte"
if which_set not in ["train", "valid", "test"]:
raise ValueError("Unknown argument setting for which_set: %s" % which_set)
if which_set in ["train", "valid"]:
path_lbl = train_label_file
path_img = train_image_file
elif which_set == "test":
path_lbl = test_image_file
path_img = test_image_file
with open(path_lbl, 'rb') as f:
magic, size = struct.unpack(">II", f.read(8))
if magic != 2049:
raise ValueError('Magic number mismatch, expected 2049,'
'got {}'.format(magic))
labels = array("B", f.read())
with open(path_img, 'rb') as f:
magic, size, rows, cols = struct.unpack(">IIII", f.read(16))
if magic != 2051:
raise ValueError('Magic number mismatch, expected 2051,'
'got {}'.format(magic))
image_data = array("B", f.read())
images = []
for i in range(size):
images.append([0] * rows * cols)
for i in range(size):
images[i][:] = image_data[i * rows * cols:(i + 1) * rows * cols]
images = np.array(images, dtype=np.float32)
labels = np.array(labels, dtype=np.int64)
if which_set == "train":
images = images[:50000]
labels = labels[:50000]
elif which_set == "valid":
images = images[50000:]
labels = labels[50000:]
elif which_set == "test":
# test set from separate file, should be correct
pass
return images, labels
def numpy_softmax(arr):
maxes = np.amax(arr, axis=1)
maxes = maxes.reshape(maxes.shape[0], 1)
e = np.exp(arr - maxes)
dist = e / np.sum(e, axis=1, keepdims=True)
return dist
def weights_init(mod):
classname = mod.__class__.__name__
if classname.find('Conv') != -1:
mod.weight.data.normal_(0.0, 0.2)
elif classname.find('Linear') != -1:
mod.weight.data.normal_(0.0, 0.1)
elif classname.find('BatchNorm') != -1:
mod.weight.data.normal_(1.0, 0.02)
mod.bias.data.fill_(0)
class Net(nn.Module):
def __init__(self, input_dim, target_dim):
super(Net, self).__init__()
self.fc1 = nn.Linear(input_dim, 500)
self.fc2 = nn.Linear(500, target_dim)
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
minibatch_size = 20
n_epochs = 1000
train_images, train_labels = load_mnist("train")
valid_images, valid_labels = load_mnist("train")
test_images, test_labels = load_mnist("train")
n_input = 28 * 28
n_target = 10
train_dataset = TensorDataset(th.from_numpy(train_images),
th.from_numpy(train_labels))
train_dataloader = DataLoader(train_dataset, batch_size=minibatch_size,
shuffle=True, num_workers=2, pin_memory=True)
valid_dataset = TensorDataset(th.from_numpy(valid_images),
th.from_numpy(valid_labels))
valid_dataloader = DataLoader(valid_dataset, batch_size=minibatch_size,
shuffle=True, num_workers=2, pin_memory=True)
net = Net(n_input, n_target)
net.apply(weights_init)
inp = Variable(th.FloatTensor(minibatch_size, n_input))
target = Variable(th.LongTensor(minibatch_size, n_target))
cuda = True
if cuda:
net = net.cuda()
#optimizer = optim.SGD(net.parameters(), lr = 0.01)
optimizer = optim.Adam(net.parameters(), lr = 0.0001)
criterion = nn.CrossEntropyLoss()
def predict_function(inp):
return net(inp)
def cost_function(inp, target):
output = net(inp)
loss = criterion(output, target)
return loss
def fit_function(inp, target):
optimizer.zero_grad() # zero the gradient buffers
loss = cost_function(inp, target)
loss.backward()
optimizer.step() # Does the update
return loss
n_epochs = 1000
tot_start = time.time()
for e in range(n_epochs):
sum_avg_loss = 0.
check_minibatches = 0
start = time.time()
for data in train_dataloader:
X_mb, y_mb = data
y_mb = y_mb[:, 0]
if cuda:
X_mb = X_mb.cuda()
y_mb = y_mb.cuda()
inp = Variable(X_mb)
target = Variable(y_mb)
l = fit_function(inp, target)
sum_avg_loss += l.cpu().data.numpy()
check_minibatches += 1
end = time.time()
total_wrong = 0
total = 0
for data in valid_dataloader:
X_mb, y_mb = data
y_mb = y_mb[:, 0]
y_true = y_mb.cpu().numpy()
if cuda:
X_mb = X_mb.cuda()
y_mb = y_mb.cuda()
lin_pred = predict_function(inp)
lin_pred = lin_pred.cpu().data.numpy()
pred = numpy_softmax(lin_pred)
wrong = sum(y_true.astype("int32") != pred.argmax(axis=-1))
total_wrong += wrong
total += y_true.shape[0]
print("Epoch %i" % e)
print("Training loop time: %s seconds" % (str(end - start)))
print("Training loss: %s" % str(float(sum_avg_loss) / check_minibatches))
print("Valid error: %s" % (str(float(total_wrong) / total)))
tot_end = time.time()
print("Overall time %s" % (tot_end - tot_start))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment