# coding:utf-8 | |
from __future__ import absolute_import | |
from __future__ import unicode_literals | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import torch.optim as optim | |
from torch.utils.data.dataset import Dataset | |
from torch.utils.data import DataLoader | |
from torchvision import transforms | |
import cPickle | |
import numpy as np | |
import os | |
lr = 0.01 | |
momentum = 0.9 | |
def unpickle(file): | |
fo = open(file, 'rb') | |
dict = cPickle.load(fo) | |
fo.close() | |
return dict | |
def conv_data2image(data): | |
return np.rollaxis(data.reshape((3, 32, 32)), 0, 3) | |
def load_cirar10(folder): | |
""" | |
load cifar10 | |
:return: train_X, train_y, test_X, test_y | |
""" | |
for i in range(1, 6): | |
fname = os.path.join(folder, "%s%d" % ("data_batch_", i)) | |
data_dict = unpickle(fname) | |
if i == 1: | |
train_X = data_dict['data'] | |
train_y = data_dict['labels'] | |
else: | |
train_X = np.vstack((train_X, data_dict['data'])) | |
train_y = np.hstack((train_y, data_dict['labels'])) | |
data_dict = unpickle(os.path.join(folder, 'test_batch')) | |
test_X = data_dict['data'] | |
test_y = np.array(data_dict['labels']) | |
train_X = [conv_data2image(x) for x in train_X] | |
test_X = [conv_data2image(x) for x in test_X] | |
return train_X, train_y, test_X, test_y | |
class DataSet(Dataset): | |
def __init__(self, x, y, transform=None): | |
self.x = x | |
self.y = y | |
self.transform = transform | |
def __getitem__(self, index): | |
x = self.x[index] | |
y = self.y[index] | |
if self.transform is not None: | |
x = self.transform(x) | |
return x, y | |
def __len__(self): | |
return len(self.x) | |
class ConvolutionalNeuralNetwork(nn.Module): | |
def __init__(self): | |
super(ConvolutionalNeuralNetwork, self).__init__() | |
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1) | |
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1) | |
self.conv3 = nn.Conv2d(64, 64, kernel_size=3, padding=1) | |
self.conv4 = nn.Conv2d(64, 64, kernel_size=3, padding=1) | |
self.conv5 = nn.Conv2d(64, 128, kernel_size=3, padding=1) | |
self.conv6 = nn.Conv2d(128, 128, kernel_size=3, padding=1) | |
self.fc1 = nn.Linear(512 * 4, 256) | |
self.fc2 = nn.Linear(256, 10) | |
def forward(self, x): | |
h = F.relu(self.conv1(x)) | |
h = F.relu(self.conv2(h)) | |
h = F.max_pool2d(h, 2) | |
h = F.relu(self.conv3(h)) | |
h = F.relu(self.conv4(h)) | |
h = F.max_pool2d(h, 2) | |
h = F.relu(self.conv5(h)) | |
h = F.relu(self.conv6(h)) | |
h = F.max_pool2d(h, 2) | |
h = h.view(-1, 512 * 4) | |
h = F.relu(self.fc1(h)) | |
h = F.dropout(h, training=self.training) | |
h = F.log_softmax(self.fc2(h)) | |
return h | |
train_X, train_y, test_X, test_y = load_cirar10("cifar-10-batches-py") | |
model = ConvolutionalNeuralNetwork() | |
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=1e-6) | |
epochs = 100 | |
train_d_loader = DataLoader( | |
DataSet( | |
x=train_X, | |
y=train_y, | |
transform=transforms.Compose( | |
[ | |
transforms.ToTensor() | |
] | |
) | |
), batch_size=32, shuffle=True | |
) | |
test_d_loader = DataLoader( | |
DataSet( | |
x=test_X, | |
y=test_y, | |
transform=transforms.Compose( | |
[ | |
transforms.ToTensor() | |
] | |
) | |
), batch_size=32, shuffle=False | |
) | |
for epoch in range(epochs): | |
model.train() | |
train_loss = 0.0 | |
for index, (batch_train_x, batch_train_y) in enumerate(train_d_loader): | |
train_variable = torch.autograd.Variable(batch_train_x) | |
output = model(train_variable) | |
loss = F.nll_loss(output, torch.autograd.Variable(batch_train_y)) | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
train_loss += loss.data[0] | |
print ("training epoch: {} loss: {}".format(epoch, train_loss / len(train_d_loader))) | |
model.eval() | |
test_loss = 0.0 | |
correct = 0 | |
for index, (batch_test_x, batch_test_y) in enumerate(test_d_loader): | |
output = model(torch.autograd.Variable(batch_test_x)) | |
test_loss += F.nll_loss(output, torch.autograd.Variable(batch_test_y)).data[0] | |
pred = output.data.max(1)[1] | |
correct += pred.eq(batch_test_y).cpu().sum() | |
print ("testing epoch: {} loss: {} accuracy: {}".format(epoch, test_loss / len(test_d_loader), | |
float(correct) / float(len(test_d_loader.dataset)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment