Last active
January 26, 2017 22:31
-
-
Save kastnerkyle/af458f908d572546a2baeadc1e70c554 to your computer and use it in GitHub Desktop.
MNIST test in PyTorch, performance still TBD
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: Kyle Kastner | |
# License: BSD 3-Clause | |
import torch as th | |
import torch.nn as nn | |
import torch.optim as optim | |
from torch.autograd import Variable | |
import torch.nn.functional as F | |
from torch.utils.data import TensorDataset, DataLoader | |
import time | |
import numpy as np | |
# Modified from https://github.com/sorki/python-mnist | |
# License: BSD 3-Clause | |
def load_mnist(which_set, data_path="data/mnist"): | |
if data_path[-1] != os.sep: | |
data_path = data_path + os.sep | |
train_image_file = data_path + "train-images-idx3-ubyte" | |
train_label_file = data_path + "train-labels-idx1-ubyte" | |
test_image_file = data_path + "t10k-images-idx3-ubyte" | |
test_label_file = data_path + "t10k-labels-idx1-ubyte" | |
if which_set not in ["train", "valid", "test"]: | |
raise ValueError("Unknown argument setting for which_set: %s" % which_set) | |
if which_set in ["train", "valid"]: | |
path_lbl = train_label_file | |
path_img = train_image_file | |
elif which_set == "test": | |
path_lbl = test_image_file | |
path_img = test_image_file | |
with open(path_lbl, 'rb') as f: | |
magic, size = struct.unpack(">II", f.read(8)) | |
if magic != 2049: | |
raise ValueError('Magic number mismatch, expected 2049,' | |
'got {}'.format(magic)) | |
labels = array("B", f.read()) | |
with open(path_img, 'rb') as f: | |
magic, size, rows, cols = struct.unpack(">IIII", f.read(16)) | |
if magic != 2051: | |
raise ValueError('Magic number mismatch, expected 2051,' | |
'got {}'.format(magic)) | |
image_data = array("B", f.read()) | |
images = [] | |
for i in range(size): | |
images.append([0] * rows * cols) | |
for i in range(size): | |
images[i][:] = image_data[i * rows * cols:(i + 1) * rows * cols] | |
images = np.array(images, dtype=np.float32) | |
labels = np.array(labels, dtype=np.int64) | |
if which_set == "train": | |
images = images[:50000] | |
labels = labels[:50000] | |
elif which_set == "valid": | |
images = images[50000:] | |
labels = labels[50000:] | |
elif which_set == "test": | |
# test set from separate file, should be correct | |
pass | |
return images, labels | |
def numpy_softmax(arr): | |
maxes = np.amax(arr, axis=1) | |
maxes = maxes.reshape(maxes.shape[0], 1) | |
e = np.exp(arr - maxes) | |
dist = e / np.sum(e, axis=1, keepdims=True) | |
return dist | |
def weights_init(mod): | |
classname = mod.__class__.__name__ | |
if classname.find('Conv') != -1: | |
mod.weight.data.normal_(0.0, 0.2) | |
elif classname.find('Linear') != -1: | |
mod.weight.data.normal_(0.0, 0.1) | |
elif classname.find('BatchNorm') != -1: | |
mod.weight.data.normal_(1.0, 0.02) | |
mod.bias.data.fill_(0) | |
class Net(nn.Module): | |
def __init__(self, input_dim, target_dim): | |
super(Net, self).__init__() | |
self.fc1 = nn.Linear(input_dim, 500) | |
self.fc2 = nn.Linear(500, target_dim) | |
def forward(self, x): | |
x = F.relu(self.fc1(x)) | |
x = self.fc2(x) | |
return x | |
minibatch_size = 20 | |
n_epochs = 1000 | |
train_images, train_labels = load_mnist("train") | |
valid_images, valid_labels = load_mnist("train") | |
test_images, test_labels = load_mnist("train") | |
n_input = 28 * 28 | |
n_target = 10 | |
train_dataset = TensorDataset(th.from_numpy(train_images), | |
th.from_numpy(train_labels)) | |
train_dataloader = DataLoader(train_dataset, batch_size=minibatch_size, | |
shuffle=True, num_workers=2, pin_memory=True) | |
valid_dataset = TensorDataset(th.from_numpy(valid_images), | |
th.from_numpy(valid_labels)) | |
valid_dataloader = DataLoader(valid_dataset, batch_size=minibatch_size, | |
shuffle=True, num_workers=2, pin_memory=True) | |
net = Net(n_input, n_target) | |
net.apply(weights_init) | |
inp = Variable(th.FloatTensor(minibatch_size, n_input)) | |
target = Variable(th.LongTensor(minibatch_size, n_target)) | |
cuda = True | |
if cuda: | |
net = net.cuda() | |
#optimizer = optim.SGD(net.parameters(), lr = 0.01) | |
optimizer = optim.Adam(net.parameters(), lr = 0.0001) | |
criterion = nn.CrossEntropyLoss() | |
def predict_function(inp): | |
return net(inp) | |
def cost_function(inp, target): | |
output = net(inp) | |
loss = criterion(output, target) | |
return loss | |
def fit_function(inp, target): | |
optimizer.zero_grad() # zero the gradient buffers | |
loss = cost_function(inp, target) | |
loss.backward() | |
optimizer.step() # Does the update | |
return loss | |
n_epochs = 1000 | |
tot_start = time.time() | |
for e in range(n_epochs): | |
sum_avg_loss = 0. | |
check_minibatches = 0 | |
start = time.time() | |
for data in train_dataloader: | |
X_mb, y_mb = data | |
y_mb = y_mb[:, 0] | |
if cuda: | |
X_mb = X_mb.cuda() | |
y_mb = y_mb.cuda() | |
inp = Variable(X_mb) | |
target = Variable(y_mb) | |
l = fit_function(inp, target) | |
sum_avg_loss += l.cpu().data.numpy() | |
check_minibatches += 1 | |
end = time.time() | |
total_wrong = 0 | |
total = 0 | |
for data in valid_dataloader: | |
X_mb, y_mb = data | |
y_mb = y_mb[:, 0] | |
y_true = y_mb.cpu().numpy() | |
if cuda: | |
X_mb = X_mb.cuda() | |
y_mb = y_mb.cuda() | |
lin_pred = predict_function(inp) | |
lin_pred = lin_pred.cpu().data.numpy() | |
pred = numpy_softmax(lin_pred) | |
wrong = sum(y_true.astype("int32") != pred.argmax(axis=-1)) | |
total_wrong += wrong | |
total += y_true.shape[0] | |
print("Epoch %i" % e) | |
print("Training loop time: %s seconds" % (str(end - start))) | |
print("Training loss: %s" % str(float(sum_avg_loss) / check_minibatches)) | |
print("Valid error: %s" % (str(float(total_wrong) / total))) | |
tot_end = time.time() | |
print("Overall time %s" % (tot_end - tot_start)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment