Last active
April 5, 2020 06:11
-
-
Save dcslin/237071ddc6db624967aaedc4722f1b64 to your computer and use it in GitHub Desktop.
mnist singa
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import urllib.request | |
import gzip | |
import numpy as np | |
import codecs | |
from singa import device | |
from singa import tensor | |
from singa import opt | |
from singa import autograd | |
def load_dataset(): | |
train_x_url = 'http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz' | |
train_y_url = 'http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz' | |
valid_x_url = 'http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz' | |
valid_y_url = 'http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz' | |
train_x = read_image_file(check_exist_or_download(train_x_url)).astype( | |
np.float32) | |
train_y = read_label_file(check_exist_or_download(train_y_url)).astype( | |
np.float32) | |
valid_x = read_image_file(check_exist_or_download(valid_x_url)).astype( | |
np.float32) | |
valid_y = read_label_file(check_exist_or_download(valid_y_url)).astype( | |
np.float32) | |
return train_x, train_y, valid_x, valid_y | |
def check_exist_or_download(url): | |
download_dir = '/tmp/' | |
name = url.rsplit('/', 1)[-1] | |
filename = os.path.join(download_dir, name) | |
if not os.path.isfile(filename): | |
print("Downloading %s" % url) | |
urllib.request.urlretrieve(url, filename) | |
return filename | |
def read_label_file(path): | |
with gzip.open(path, 'rb') as f: | |
data = f.read() | |
assert get_int(data[:4]) == 2049 | |
length = get_int(data[4:8]) | |
parsed = np.frombuffer(data, dtype=np.uint8, offset=8).reshape( | |
(length)) | |
return parsed | |
def get_int(b): | |
return int(codecs.encode(b, 'hex'), 16) | |
def read_image_file(path): | |
with gzip.open(path, 'rb') as f: | |
data = f.read() | |
assert get_int(data[:4]) == 2051 | |
length = get_int(data[4:8]) | |
num_rows = get_int(data[8:12]) | |
num_cols = get_int(data[12:16]) | |
parsed = np.frombuffer(data, dtype=np.uint8, offset=16).reshape( | |
(length, 1, num_rows, num_cols)) | |
return parsed | |
def to_categorical(y, num_classes): | |
""" | |
Converts a class vector (integers) to binary class matrix. | |
Args | |
y: class vector to be converted into a matrix | |
(integers from 0 to num_classes). | |
num_classes: total number of classes. | |
Return | |
A binary matrix representation of the input. | |
""" | |
y = np.array(y, dtype="int") | |
n = y.shape[0] | |
categorical = np.zeros((n, num_classes)) | |
categorical[np.arange(n), y] = 1 | |
categorical = categorical.astype(np.float32) | |
return categorical | |
class CNN: | |
def __init__(self): | |
self.conv1 = autograd.Conv2d(1, 20, 5, padding=0) | |
self.conv2 = autograd.Conv2d(20, 50, 5, padding=0) | |
self.linear1 = autograd.Linear(4 * 4 * 50, 500) | |
self.linear2 = autograd.Linear(500, 10) | |
self.pooling1 = autograd.MaxPool2d(2, 2, padding=0) | |
self.pooling2 = autograd.MaxPool2d(2, 2, padding=0) | |
def forward(self, x): | |
y = self.conv1(x) | |
y = autograd.relu(y) | |
y = self.pooling1(y) | |
y = self.conv2(y) | |
y = autograd.relu(y) | |
y = self.pooling2(y) | |
y = autograd.flatten(y) | |
y = self.linear1(y) | |
y = autograd.relu(y) | |
y = self.linear2(y) | |
return y | |
def accuracy(pred, target): | |
y = np.argmax(pred, axis=1) | |
t = np.argmax(target, axis=1) | |
a = y == t | |
return np.array(a, "int").sum() / float(len(t)) | |
def train(train, | |
x, | |
y, | |
epochs=1, | |
batch_size=64, | |
dev=device.get_default_device()): | |
batch_number = x.shape[0] // batch_size | |
for i in range(epochs): | |
for b in range(batch_number): | |
l_idx = b * batch_size | |
r_idx = (b + 1) * batch_size | |
x_batch = tensor.Tensor(device=dev, data=x[l_idx:r_idx]) | |
target_batch = tensor.Tensor(device=dev, data=y[l_idx:r_idx]) | |
output_batch = model.forward(x_batch) | |
loss = autograd.softmax_cross_entropy(output_batch, target_batch) | |
accuracy_rate = accuracy(tensor.to_numpy(output_batch), | |
tensor.to_numpy(target_batch)) | |
sgd = opt.SGD(lr=0.01) | |
for p, gp in autograd.backward(loss): | |
sgd.update(p, gp) | |
sgd.step() | |
if (i * b) % 200 == 0: | |
print("acc %6.2f loss, %6.2f" % | |
(accuracy_rate, tensor.to_numpy(loss)[0])) | |
print("training completed") | |
def test(model, x, y, batch_size=64, dev=device.get_default_device()): | |
batch_number = x.shape[0] // batch_size | |
result = 0 | |
for b in range(batch_number): | |
l_idx = b * batch_size | |
r_idx = (b + 1) * batch_size | |
x_batch = tensor.Tensor(device=dev, data=x[l_idx:r_idx]) | |
target_batch = tensor.Tensor(device=dev, data=y[l_idx:r_idx]) | |
output_batch = model.forward(x_batch) | |
result += accuracy(tensor.to_numpy(output_batch), | |
tensor.to_numpy(target_batch)) | |
print("testing acc %6.2f" % (result / batch_number)) | |
if __name__ == "__main__": | |
# create device | |
dev = device.create_cuda_gpu() | |
#dev = device.get_default_device() | |
# create model | |
model = CNN() | |
# load data | |
train_x, train_y, valid_x, valid_y = load_dataset() | |
# normalization | |
train_x = train_x / 255 | |
valid_x = valid_x / 255 | |
train_y = to_categorical(train_y, 10) | |
valid_y = to_categorical(valid_y, 10) | |
# do training | |
autograd.training = True | |
train(model, train_x, train_y, dev=dev) | |
# do testing | |
autograd.training = False | |
test(model, valid_x, valid_y, dev=dev) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import urllib.request | |
import gzip | |
import numpy as np | |
import codecs | |
from singa import device | |
from singa import tensor | |
from singa import opt | |
from singa import autograd | |
def load_dataset(): | |
train_x_url = 'http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz' | |
train_y_url = 'http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz' | |
valid_x_url = 'http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz' | |
valid_y_url = 'http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz' | |
train_x = read_image_file(check_exist_or_download(train_x_url)).astype( | |
np.float32) | |
train_y = read_label_file(check_exist_or_download(train_y_url)).astype( | |
np.float32) | |
valid_x = read_image_file(check_exist_or_download(valid_x_url)).astype( | |
np.float32) | |
valid_y = read_label_file(check_exist_or_download(valid_y_url)).astype( | |
np.float32) | |
return train_x, train_y, valid_x, valid_y | |
def check_exist_or_download(url): | |
download_dir = '/tmp/' | |
name = url.rsplit('/', 1)[-1] | |
filename = os.path.join(download_dir, name) | |
if not os.path.isfile(filename): | |
print("Downloading %s" % url) | |
urllib.request.urlretrieve(url, filename) | |
return filename | |
def read_label_file(path): | |
with gzip.open(path, 'rb') as f: | |
data = f.read() | |
assert get_int(data[:4]) == 2049 | |
length = get_int(data[4:8]) | |
parsed = np.frombuffer(data, dtype=np.uint8, offset=8).reshape( | |
(length)) | |
return parsed | |
def get_int(b): | |
return int(codecs.encode(b, 'hex'), 16) | |
def read_image_file(path): | |
with gzip.open(path, 'rb') as f: | |
data = f.read() | |
assert get_int(data[:4]) == 2051 | |
length = get_int(data[4:8]) | |
num_rows = get_int(data[8:12]) | |
num_cols = get_int(data[12:16]) | |
parsed = np.frombuffer(data, dtype=np.uint8, offset=16).reshape( | |
(length, 1, num_rows, num_cols)) | |
return parsed | |
def to_categorical(y, num_classes): | |
""" | |
Converts a class vector (integers) to binary class matrix. | |
Args | |
y: class vector to be converted into a matrix | |
(integers from 0 to num_classes). | |
num_classes: total number of classes. | |
Return | |
A binary matrix representation of the input. | |
""" | |
y = np.array(y, dtype="int") | |
n = y.shape[0] | |
categorical = np.zeros((n, num_classes)) | |
categorical[np.arange(n), y] = 1 | |
categorical = categorical.astype(np.float32) | |
return categorical | |
class CNN: | |
def __init__(self): | |
self.conv1 = autograd.Conv2d(1, 20, 5, padding=0) | |
self.conv2 = autograd.Conv2d(20, 50, 5, padding=0) | |
self.linear1 = autograd.Linear(4 * 4 * 50, 500) | |
self.linear2 = autograd.Linear(500, 10) | |
self.pooling1 = autograd.MaxPool2d(2, 2, padding=0) | |
self.pooling2 = autograd.MaxPool2d(2, 2, padding=0) | |
def forward(self, x): | |
y = self.conv1(x) | |
y = autograd.relu(y) | |
y = self.pooling1(y) | |
y = self.conv2(y) | |
y = autograd.relu(y) | |
y = self.pooling2(y) | |
y = autograd.flatten(y) | |
y = self.linear1(y) | |
y = autograd.relu(y) | |
y = self.linear2(y) | |
return y | |
def accuracy(pred, target): | |
y = np.argmax(pred, axis=1) | |
t = np.argmax(target, axis=1) | |
a = y == t | |
return np.array(a, "int").sum() / float(len(t)) | |
def train(train, | |
x, | |
y, | |
epochs=1, | |
batch_size=64, | |
dev=device.get_default_device()): | |
batch_number = x.shape[0] // batch_size | |
for i in range(epochs): | |
for b in range(batch_number): | |
l_idx = b * batch_size | |
r_idx = (b + 1) * batch_size | |
x_batch = tensor.Tensor(device=dev, data=x[l_idx:r_idx]) | |
target_batch = tensor.Tensor(device=dev, data=y[l_idx:r_idx]) | |
output_batch = model.forward(x_batch) | |
loss = autograd.softmax_cross_entropy(output_batch, target_batch) | |
accuracy_rate = accuracy(tensor.to_numpy(output_batch), | |
tensor.to_numpy(target_batch)) | |
sgd = opt.SGD(lr=0.01) | |
for p, gp in autograd.backward(loss): | |
sgd.update(p, gp) | |
sgd.step() | |
if (i * b) % 200 == 0: | |
print("acc %6.2f loss, %6.2f" % | |
(accuracy_rate, tensor.to_numpy(loss)[0])) | |
print("training completed") | |
def test(model, x, y, batch_size=64, dev=device.get_default_device()): | |
batch_number = x.shape[0] // batch_size | |
result = 0 | |
for b in range(batch_number): | |
l_idx = b * batch_size | |
r_idx = (b + 1) * batch_size | |
x_batch = tensor.Tensor(device=dev, data=x[l_idx:r_idx]) | |
target_batch = tensor.Tensor(device=dev, data=y[l_idx:r_idx]) | |
output_batch = model.forward(x_batch) | |
result += accuracy(tensor.to_numpy(output_batch), | |
tensor.to_numpy(target_batch)) | |
print("testing acc %6.2f" % (result / batch_number)) | |
if __name__ == "__main__": | |
# create device | |
dev = device.get_default_device() | |
# create model | |
model = CNN() | |
# load data | |
train_x, train_y, valid_x, valid_y = load_dataset() | |
# normalization | |
train_x = train_x / 255 | |
valid_x = valid_x / 255 | |
train_y = to_categorical(train_y, 10) | |
valid_y = to_categorical(valid_y, 10) | |
# do training | |
autograd.training = True | |
train(model, train_x, train_y, dev=dev) | |
# do testing | |
autograd.training = False | |
test(model, valid_x, valid_y, dev=dev) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment