Skip to content

Instantly share code, notes, and snippets.

@nudles
Last active October 19, 2016 12:20
Show Gist options
  • Save nudles/4893164157cac09d4af0d22c13a62afb to your computer and use it in GitHub Desktop.
Save nudles/4893164157cac09d4af0d22c13a62afb to your computer and use it in GitHub Desktop.
cifar10 for rafiki with train and serve
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
""" The VGG model is adapted from http://torch.ch/blog/2015/07/30/cifar.html.
"""
from singa import layer
from singa import initializer
from singa import metric
from singa import loss
from singa import net as ffnet
def ConvBnReLU(net, name, nb_filers, sample_shape=None):
net.add(layer.Conv2D(name + '_1', nb_filers, 3, 1, pad=1,
input_sample_shape=sample_shape))
net.add(layer.BatchNormalization(name + '_2'))
net.add(layer.Activation(name + '_3'))
def create_net(use_cpu=False):
if use_cpu:
layer.engine = 'singacpp'
net = ffnet.FeedForwardNet(loss.SoftmaxCrossEntropy(), metric.Accuracy())
ConvBnReLU(net, 'conv1_1', 64, (3, 32, 32))
net.add(layer.Dropout('drop1', 0.3))
ConvBnReLU(net, 'conv1_2', 64)
net.add(layer.MaxPooling2D('pool1', 2, 2, border_mode='valid'))
ConvBnReLU(net, 'conv2_1', 128)
net.add(layer.Dropout('drop2_1', 0.4))
ConvBnReLU(net, 'conv2_2', 128)
net.add(layer.MaxPooling2D('pool2', 2, 2, border_mode='valid'))
ConvBnReLU(net, 'conv3_1', 256)
net.add(layer.Dropout('drop3_1', 0.4))
ConvBnReLU(net, 'conv3_2', 256)
net.add(layer.Dropout('drop3_2', 0.4))
ConvBnReLU(net, 'conv3_3', 256)
net.add(layer.MaxPooling2D('pool3', 2, 2, border_mode='valid'))
ConvBnReLU(net, 'conv4_1', 512)
net.add(layer.Dropout('drop4_1', 0.4))
ConvBnReLU(net, 'conv4_2', 512)
net.add(layer.Dropout('drop4_2', 0.4))
ConvBnReLU(net, 'conv4_3', 512)
net.add(layer.MaxPooling2D('pool4', 2, 2, border_mode='valid'))
ConvBnReLU(net, 'conv5_1', 512)
net.add(layer.Dropout('drop5_1', 0.4))
ConvBnReLU(net, 'conv5_2', 512)
net.add(layer.Dropout('drop5_2', 0.4))
ConvBnReLU(net, 'conv5_3', 512)
net.add(layer.MaxPooling2D('pool5', 2, 2, border_mode='valid'))
net.add(layer.Flatten('flat'))
net.add(layer.Dropout('drop_flat', 0.5))
net.add(layer.Dense('ip1', 512))
net.add(layer.BatchNormalization('batchnorm_ip1'))
net.add(layer.Activation('relu_ip1'))
net.add(layer.Dropout('drop_ip2', 0.5))
net.add(layer.Dense('ip2', 10))
print 'Start intialization............'
for (p, name) in zip(net.param_values(), net.param_names()):
print name, p.shape
if 'mean' in name or 'beta' in name:
p.set_value(0.0)
elif 'var' in name:
p.set_value(1.0)
elif 'gamma' in name:
initializer.uniform(p, 0, 1)
elif len(p.shape) > 1:
if 'conv' in name:
initializer.gaussian(p, 0, 3 * 3 * p.shape[0])
else:
p.gaussian(0, 0.02)
else:
p.set_value(0)
print name, p.l1()
return net
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
'''Predicting the labels for new images using the pre-trained alexnet model'''
import numpy as np
import argparse
import os
from singa import device
from singa import tensor
from rafiki import agent
from singa import image_tool
import model
rafiki = agent.Agent()
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1] in \
["PNG", "png", "jpg", "JPG", "JPEG", "jpeg"]
def serve(net, mean, dev, label_map, topk=5):
tool = image_tool.ImageTool()
while True:
try:
key, val = rafiki.Pull()
if key is agent.STOP:
break
image = val.files['image']
if not image:
rafiki.PushStatus(agent.ERROR, 'no image found')
if not allowed_file(image.filename):
rafiki.PushStatus(agent.ERROR, 'only jpg/png image is allowed')
image = tool.load(image).resize_by_list([32]).get()[0]
dat = np.array(image.convert('RGB'), dtype=np.float32)
dat -= mean
x = tensor.from_numpy([dat])
x.to_device(dev)
y = net.predict(x)
y.to_host()
prob = tensor.to_numpy(y)
# sort prob in descending order
labels = np.flipud(np.argsort(prob))[0]
rafiki.PushResponse('labels',
' '.join(label_map[labels[0:topk]]))
except Exception as e:
rafiki.PushStatus(agent.ERROR, str(e))
rafiki.PushStatus(agent.SUCCESS, 'Stopped the serving job')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Image classification')
parser.add_argument('--use_cpu', action='store_true')
parser.add_argument('--mean_file', default='mean.npy')
parser.add_argument('--topk', default=10)
args = parser.parse_args()
if not os.path.exists(args.mean_file):
rafiki.PushStatus(agent.Error, 'Cannot find the mean file')
net = model.create_net(args.use_cpu)
net.load('model', 20) # the checkpoint from train.py
if args.use_cpu:
dev = device.get_default_device()
else:
dev = device.create_cuda_gpu()
model.to_device(dev)
mean = np.load(args.mean_file)
label_map = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog',
'horse', 'ship', 'truck']
serve(net, mean, dev, label_map, args.topk)
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
""" CIFAR10 dataset is at https://www.cs.toronto.edu/~kriz/cifar.html.
It includes 5 binary dataset, each contains 10000 images. 1 row (1 image)
includes 1 label & 3072 pixels. 3072 pixels are 3 channels of a 32x32 image
"""
import cPickle
import numpy as np
import os
import argparse
from singa import utils
from singa import optimizer
from singa import device
from singa import tensor
from singa.proto import core_pb2
from rafiki import agent
import model
rafiki = agent.Agent()
def load_dataset(filepath):
print 'Loading data file %s' % filepath
with open(filepath, 'rb') as fd:
cifar10 = cPickle.load(fd)
image = cifar10['data'].astype(dtype=np.uint8)
image = image.reshape((-1, 3, 32, 32))
label = np.asarray(cifar10['labels'], dtype=np.uint8)
label = label.reshape(label.size, 1)
return image, label
def load_train_data(dir_path, num_batches=5):
labels = []
batchsize = 10000
images = np.empty((num_batches * batchsize, 3, 32, 32), dtype=np.uint8)
for did in range(1, num_batches + 1):
fname_train_data = dir_path + "/data_batch_{}".format(did)
image, label = load_dataset(fname_train_data)
images[(did - 1) * batchsize:did * batchsize] = image
labels.extend(label)
images = np.array(images, dtype=np.float32)
labels = np.array(labels, dtype=np.int32)
return images, labels
def load_test_data(dir_path):
images, labels = load_dataset(dir_path + "/test_batch")
return np.array(images, dtype=np.float32), np.array(labels, dtype=np.int32)
def normalize_for_vgg(train_x, test_x):
mean = train_x.mean()
std = train_x.std()
train_x -= mean
test_x -= mean
train_x /= std
test_x /= std
return train_x, test_x
def vgg_lr(epoch):
return 0.1 / float(1 << ((epoch / 25)))
def train(data, net, max_epoch, get_lr, weight_decay, batch_size=100,
use_cpu=False):
print 'Start intialization............'
if use_cpu:
print 'Using CPU'
dev = device.get_default_device()
else:
print 'Using GPU'
dev = device.create_cuda_gpu()
net.to_device(dev)
opt = optimizer.SGD(momentum=0.9, weight_decay=weight_decay)
for (p, specs) in zip(net.param_names(), net.param_specs()):
opt.register(p, specs)
tx = tensor.Tensor((batch_size, 3, 32, 32), dev)
ty = tensor.Tensor((batch_size,), dev, core_pb2.kInt)
train_x, train_y, test_x, test_y = data
num_train_batch = train_x.shape[0] / batch_size
num_test_batch = test_x.shape[0] / batch_size
idx = np.arange(train_x.shape[0], dtype=np.int32)
for epoch in range(max_epoch):
np.random.shuffle(idx)
loss, acc = 0.0, 0.0
print 'Epoch %d' % epoch
for b in range(num_train_batch):
x = train_x[idx[b * batch_size: (b + 1) * batch_size]]
y = train_y[idx[b * batch_size: (b + 1) * batch_size]]
tx.copy_from_numpy(x)
ty.copy_from_numpy(y)
grads, (l, a) = net.train(tx, ty)
loss += l
acc += a
for (s, p, g) in zip(net.param_names(), net.param_values(), grads):
opt.apply_with_lr(epoch, get_lr(epoch), g, p, str(s), b)
# update progress bar
utils.update_progress(b * 1.0 / num_train_batch,
'training loss = %f, accuracy = %f' % (l, a))
agent.PushTrainAccuracy(epoch, acc / num_train_batch)
agent.PushTrainAccuracy(epoch, loss / num_train_batch)
loss, acc = 0.0, 0.0
for b in range(num_test_batch):
x = test_x[b * batch_size: (b + 1) * batch_size]
y = test_y[b * batch_size: (b + 1) * batch_size]
tx.copy_from_numpy(x)
ty.copy_from_numpy(y)
l, a = net.evaluate(tx, ty)
loss += l
acc += a
agent.PushTrainAccuracy(epoch, acc / num_test_batch)
agent.PushTrainAccuracy(epoch, loss / num_test_batch)
net.save('model', 20) # save model params into checkpoint file
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Train dcnn for cifar10')
parser.add_argument('--data', default='cifar-10-batches-py')
parser.add_argument('--use_cpu', action='store_true')
args = parser.parse_args()
if not os.path.exists(args.data):
rafiki.PushStatus(agent.Error, 'Cannot find the dataset')
print 'Loading data ..................'
train_x, train_y = load_train_data(args.data)
test_x, test_y = load_test_data(args.data)
train_x, test_x = normalize_for_vgg(train_x, test_x)
net = model.create_net(args.use_cpu)
train((train_x, train_y, test_x, test_y), net, 200, vgg_lr, 0.0005,
use_cpu=args.use_cpu)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment