Skip to content

Instantly share code, notes, and snippets.

@ischlag
Last active June 21, 2016 22:06
Show Gist options
  • Save ischlag/d9fc4429971ce7c1957798de30c56372 to your computer and use it in GitHub Desktop.
Save ischlag/d9fc4429971ce7c1957798de30c56372 to your computer and use it in GitHub Desktop.
import tensorflow as tf
import numpy as np
import random
import time
from tensorflow.python.framework import ops
from tensorflow.python.framework import dtypes
dataset_path = "/cs/home/is59/database/"
train_labels_file = "labels.csv"
#dataset_path = "/cs/home/is59/mnist/"
#test_labels_file = "test-labels.csv"
#train_labels_file = "train-labels.csv"
full_size = 1171 #1171
test_set_size = 100
logs_path = "/tmp/coins/1/"
IMAGE_HEIGHT = 200
IMAGE_WIDTH = 200
NUM_CHANNELS = 3
NUM_LABELS = 2
SEED = 123
TRAIN_BATCH_SIZE = 32
TEST_BATCH_SIZE = 32
# ---------------------------------------------------------------------------------
# cluster specification
parameter_servers = [ "pc3-013-l.cs.university.ac.uk:2222"]
workers = [ "pc3-002-l.cs.university.ac.uk:2222",
"pc3-012-l.cs.university.ac.uk:2222",
"pc3-007-l.cs.university.ac.uk:2222"]
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers})
# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS
# start a server for a specific task
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
# ---------------------------------------------------------------------------------
def encode_label(label):
if "Decius" in label:
return 0
elif "Marcus Aurelius" in label:
return 1
else:
assert False, "Invalid label string"
def read_label_file(file):
f = open(file, "r")
filepaths = []
labels = []
for line in f:
filepath, label = line.split(",")
filepaths.append(filepath)
labels.append(encode_label(label))
return filepaths, labels
# reading labels and file path
train_filepaths, train_labels = read_label_file(dataset_path + train_labels_file)
#test_filepaths, test_labels = read_label_file(dataset_path + test_labels_file)
# transform relative path into full path
train_filepaths = [ dataset_path + "images/" + fp for fp in train_filepaths]
#train_filepaths = [ dataset_path + fp for fp in train_filepaths]
#test_filepaths = [ dataset_path + fp for fp in test_filepaths]
# for this example we will create or own test partition
#all_filepaths = train_filepaths + test_filepaths
#all_labels = train_labels + test_labels
#print "Filepaths loaded: ", len(all_filepaths)
all_filepaths = train_filepaths[:full_size]
all_labels = train_labels[:full_size]
# convert string into tensors
all_images = ops.convert_to_tensor(all_filepaths, dtype=dtypes.string)
all_labels = tf.one_hot(all_labels,
depth=NUM_LABELS,
on_value=1,
off_value=0,
axis=-1)
#all_labels = ops.convert_to_tensor(all_labels, dtype=dtypes.int32)
# create a partition vector
partitions = [0] * len(all_filepaths)
partitions[:test_set_size] = [1] * test_set_size
random.shuffle(partitions)
# partition our data into a test and train set according to our partition vector
train_images, test_images = tf.dynamic_partition(all_images, partitions, 2)
train_labels, test_labels = tf.dynamic_partition(all_labels, partitions, 2)
# create input queues
train_input_queue = tf.train.slice_input_producer(
[train_images, train_labels],
shuffle=True)
test_input_queue = tf.train.slice_input_producer(
[test_images, test_labels],
shuffle=False)
# process path and string tensor into an image and a label
file_content = tf.read_file(train_input_queue[0])
train_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS)
train_label = train_input_queue[1]
file_content = tf.read_file(test_input_queue[0])
test_image = tf.image.decode_jpeg(file_content, channels=NUM_CHANNELS)
test_label = test_input_queue[1]
# define tensor shape
train_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS])
test_image.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, NUM_CHANNELS])
# collect batches of images before processing
train_image_batch, train_label_batch = tf.train.batch(
[train_image, train_label],
batch_size=TRAIN_BATCH_SIZE
,num_threads=6
)
test_image_batch, test_label_batch = tf.train.batch(
[test_image, test_label],
batch_size=TEST_BATCH_SIZE
,num_threads=6
)
print "input pipeline ready"
# -------------------------------------------------------------------
# param and worker server
if FLAGS.job_name == "ps":
server.join()
assert FLAGS.job_name == "worker", "Invalid job name."
# between-graph replication
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# -------------------------------------------------------------------
# variables
with tf.name_scope("params"):
conv1_weights = tf.Variable(tf.truncated_normal(
[5, 5, NUM_CHANNELS, 16],
stddev=1e-4,
seed=SEED))
conv1_biases = tf.Variable(tf.zeros([16]))
conv2_weights = tf.Variable(tf.truncated_normal(
[5, 5, 16, 16],
stddev=1e-4,
seed=SEED))
conv2_biases = tf.Variable(tf.constant(0.1, shape=[16]))
fc1_weights = tf.Variable(tf.truncated_normal(
[IMAGE_HEIGHT // 4 * IMAGE_WIDTH // 4 * 16, 512],
stddev=1e-4,
seed=SEED))
fc1_biases = tf.Variable(tf.constant(0.1, shape=[512]))
fc2_weights = tf.Variable(tf.truncated_normal(
[512, NUM_LABELS],
stddev=1e-4,
seed=SEED))
fc2_biases = tf.Variable(tf.constant(0.1, shape=[NUM_LABELS]))
# model building with keras
with tf.name_scope("model"):
def model(data, train=False):
conv = tf.nn.conv2d(data,
conv1_weights,
strides=[1, 1, 1, 1],
padding="SAME")
relu = tf.nn.relu(tf.nn.bias_add(conv, conv1_biases))
pool = tf.nn.max_pool(relu,
ksize=[1, 2, 2, 1],
strides=[1, 2, 2, 1],
padding="SAME")
norm = tf.nn.lrn(pool, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name='norm1')
conv = tf.nn.conv2d(norm,
conv2_weights,
strides=[1, 1, 1, 1],
padding='SAME')
relu = tf.nn.relu(tf.nn.bias_add(conv, conv2_biases))
norm = tf.nn.lrn(relu, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name='norm2')
pool = tf.nn.max_pool(norm,
ksize=[1, 2, 2, 1],
strides=[1, 2, 2, 1],
padding='SAME')
# reshape the feature map for fully connected layer
pool_shape = pool.get_shape().as_list()
reshape = tf.reshape(pool,
[pool_shape[0], pool_shape[1] * pool_shape[2] * pool_shape[3]])
hidden = tf.nn.relu(tf.matmul(reshape, fc1_weights) + fc1_biases)
if train:
hidden = tf.nn.dropout(hidden, 0.5, seed=SEED)
return tf.matmul(hidden, fc2_weights) + fc2_biases
# ------------------------------------------------------------------------
# train loss
train_logits = model(tf.cast(train_image_batch, tf.float32), True)
with tf.name_scope("train-loss"):
train_cross_entropy = tf.nn.softmax_cross_entropy_with_logits(
train_logits,
tf.cast(train_label_batch, tf.float32),
name="train-cross-entropy")
train_loss = tf.reduce_mean(train_cross_entropy, name='train-loss')
# test loss
test_logits = model(tf.cast(test_image_batch, tf.float32), True)
with tf.name_scope("test-loss"):
test_cross_entropy = tf.nn.softmax_cross_entropy_with_logits(
test_logits,
tf.cast(test_label_batch, tf.float32),
name="test-cross-entropy")
test_loss = tf.reduce_mean(test_cross_entropy, name='test-loss')
# train accuracy
train_top_1 = tf.nn.in_top_k(
predictions=train_logits,
targets=tf.argmax(train_label_batch,1),
k=1)
# test accuracy
test_top_1 = tf.nn.in_top_k(
predictions=test_logits,
targets=tf.argmax(test_label_batch,1),
k=1)
# ---------------------------------------------------------------------
# step variable
global_step = tf.get_variable('global_step', [],
initializer = tf.constant_initializer(0),
trainable = False)
# optimizer
train_op = tf.train.AdamOptimizer(1e-4).minimize(
train_loss,
global_step=global_step)
# summaries
tf.scalar_summary("train-loss", train_loss)
tf.scalar_summary("test-loss", test_loss)
summary_op = tf.merge_all_summaries();
# initialization
init_op = tf.initialize_all_variables()
# ---------------------------------------------------------------------
def train_epoch(sess, batch_per_epoch, writer):
errors = []
times = []
for i in range(batch_per_epoch):
batch_start_time = time.time()
_, loss_value, summary, step = sess.run([train_op, train_loss, summary_op, global_step])
writer.add_summary(summary, step)
assert not np.isnan(loss_value), 'Model diverged with loss = NaN'
errors.append(loss_value)
times.append(time.time() - batch_start_time)
return (np.mean(errors), np.mean(times))
def eval_train_accuracy(sess, number_of_batches):
train_true_count = 0.0
for i in range(number_of_batches):
print "train acc ", i
train_true_count += np.sum(sess.run(train_top_1))
return train_true_count / (number_of_batches * TRAIN_BATCH_SIZE)
def eval_test_accuracy(sess, number_of_batches):
test_true_count = 0.0
for i in range(number_of_batches):
print "test acc ", i
test_true_count += np.sum(sess.run(test_top_1))
return test_true_count / (number_of_batches * TEST_BATCH_SIZE)
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
global_step=global_step,
init_op=init_op)
with sv.prepare_or_wait_for_session(server.target) as sess:
print "Session started!"
start_session_time = time.time()
# create log writer
writer = tf.train.SummaryWriter(logs_path, graph=tf.get_default_graph())
# initialize the queue threads to start to shovel data
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(30): # 15 epochs
train_avg_error, avg_time = train_epoch(sess, 30, writer)
print "Partial-Epoch Avg Error: ", train_avg_error, " AvgMsPerBatch: %.2f ms" % avg_time
train_acc = eval_train_accuracy(sess, 40)
test_acc = eval_test_accuracy(sess, 4)
elapsed_session_time = (time.time() - start_session_time)
print "Train Accuracy: ", train_acc
print "Test Accuracy: ", test_acc
print "Time needed: %.2f s" % elapsed_session_time
# stop our queue threads and properly close the session
coord.request_stop()
coord.join(threads)
#sess.close()
sv.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment