Skip to content

Instantly share code, notes, and snippets.

@Hinaser
Last active April 19, 2017 15:40
Show Gist options
  • Save Hinaser/4ef9aa272beec67aba41ad4593a61159 to your computer and use it in GitHub Desktop.
Save Hinaser/4ef9aa272beec67aba41ad4593a61159 to your computer and use it in GitHub Desktop.
MLP sample with optimizing feed performance
"""
Learn arbitrary function from training data
Architecture: MLP
Number of hidden layers: You can configure via `N_neurons_in_layers`
"""
import math
import time
import datetime as dt
import copy
from inspect import signature
import numpy as np
import tensorflow as tf
from tensorflow.python.client import timeline
"""
List of functions to learn
"""
class Distributions:
def __init__(self, n):
self.func = eval('self.%s%s'%('sample_func', n))
def __call__(self, n):
self.func = eval('self.%s%s'%('sample_func', n))
def number_of_arguments(self):
return len(signature(self.func).parameters)
def number_of_output(self):
return len(signature(self.func).return_annotation)
def get_function(self):
return self.func
@staticmethod
def sample_func1(x: float, y: float) -> [float]:
return [x * y * math.sin(x + y)]
@staticmethod
def sample_func2(x: float, y: float) -> [float]:
return [0.2 + 0.4 * x * y + 0.3 * x * math.sin(15 * y) + 0.05 * math.cos(50 * x)]
@staticmethod
def sample_func3(x: float, y: float) -> [float]:
return [3.0]
@staticmethod
def sample_func4(x: float, y: float) -> [float]:
if x*y >= 0:
return [1.0]
else:
return [-1.0]
@staticmethod
def sample_func5(x:float, y: float) -> [float, float]:
return [x + y, x**2-y**2]
@staticmethod
def sample_func6(x:float) -> [float]:
return [math.sin(x)]
@staticmethod
def sample_func7(x:float, y:float) -> [float, float]:
if x**2 + y**2 < 0.5**2:
return [1, 0]
else:
return [0, 1]
@staticmethod
def sample_func8(x0, x1, x2, x3, x4, x5, x6, x7, x8, x9) -> [float, float, float, float, float, float, float, float, float, float]:
mx = max([x0, x1, x2, x3, x4, x5, x6, x7, x8, x9])
if x0 == mx: return [1,0,0,0,0,0,0,0,0,0]
if x1 == mx: return [0,1,0,0,0,0,0,0,0,0]
if x2 == mx: return [0,0,1,0,0,0,0,0,0,0]
if x3 == mx: return [0,0,0,1,0,0,0,0,0,0]
if x4 == mx: return [0,0,0,0,1,0,0,0,0,0]
if x5 == mx: return [0,0,0,0,0,1,0,0,0,0]
if x6 == mx: return [0,0,0,0,0,0,1,0,0,0]
if x7 == mx: return [0,0,0,0,0,0,0,1,0,0]
if x8 == mx: return [0,0,0,0,0,0,0,0,1,0]
if x9 == mx: return [0,0,0,0,0,0,0,0,0,1]
@staticmethod
def sample_func9(x0,x1,x2) -> [float, float, float]:
r,g,b = [0,0,0]
if x0 > 0: r = 1
if x1 > 0: g = 1
if x2 > 0: b = 1
return [r, g, b]
class MLP:
def __init__(self):
"""
Configurable dnn parameters
"""
self.N_neurons_in_layers = [10, 10]
self.N_samples = 1000
self.training_ratio = 0.5
self.batch_size = 10
self.total_epochs = 500
self.learning_rate = 0.01
self.activation = tf.nn.tanh
self.classification_type = 'normal' # 'normal'(Regression) or 'mlc'(MultiLabel Classification') or 'mcc'(Multiclass Classification)
self.optimizer = 'SGD' # Either 'SGD' or 'Adam'
self.log_dir = '/tmp/test/summary'
self.log_weight = False
# Specify layer index for logging. i.e. [1,3]. 1 is the first hidden layer. 0 means input layer so you cannot specify it.
self.log_weight_target = None
self.log_bias = False
# Specify layer index for logging. i.e. [1,3]. 1 is the first hidden layer. 0 means input layer so you cannot specify it.
self.log_bias_target = None
self.reg_term = 0.001
self.use_reg = None
self.nn_as_function = None
self.run_name =dt.datetime.now().strftime('%Y%m%d-%H%M%S')
self.log_metadata = False
self.log_metadata_frequency = 100 # log metadata every `log_metadata_frequency` epochs
"""
Parameters whose values are decided after config has done
"""
self.session = tf.Session()
self.saver = None
self.distribution = None
self.true_function = None
self.number_of_function_parameters = {'input': None, 'output': None}
self.number_of_data = {'training': None, 'test': None}
self.supervised_data_all = {'input': None, 'output': None}
self.data = {'training':{'input': None, 'output': None},
'test': {'input': None, 'output': None}}
data_feed_template = {'entry': {'input': None, 'output': None},
'data_set': {'input': None, 'output': None},
'data_over_entire_epoch': {'input': None, 'output': None},
'data_batch': {'input': None, 'output': None}}
self.data_feed = {'training': copy.deepcopy(data_feed_template), 'test': copy.deepcopy(data_feed_template)}
self.N_layers = None
self.W = []
self.b = []
self.output = {'training': None, 'test': None, 'normal': None}
self.loss = {'training': {'mae': None, 'mse': None, 'mape': None, 'ce': None},
'test': {'mae': None, 'mse': None, 'mape': None, 'ce': None}}
self.loss_target = None
self.accuracy = {'training': None, 'test': None}
self.train_op = None
self.merged_train = None
self.merged_test = None
self.merged_weight = None
self.merged_bias = None
self.writer = None
self.layer_formation = None
def __del__(self):
self.session.close()
def configure(self, **kwargs):
if 'nn_as_function' in kwargs.keys():
self.nn_as_function = kwargs['nn_as_function']
if 'neurons' in kwargs.keys():
self.N_neurons_in_layers = kwargs['neurons']
if 'n_samples' in kwargs.keys():
self.N_samples = kwargs['n_samples']
if 'training_ratio' in kwargs.keys():
self.training_ratio = kwargs['training_ratio']
if 'batch_size' in kwargs.keys():
self.batch_size = kwargs['batch_size']
if 'total_epochs' in kwargs.keys():
self.total_epochs = kwargs['total_epochs']
if 'learning_rate' in kwargs.keys():
self.learning_rate = kwargs['learning_rate']
if 'activation' in kwargs.keys():
self.activation = kwargs['activation']
if 'classification_type' in kwargs.keys():
self.classification_type = kwargs['classification_type']
if 'optimizer' in kwargs.keys():
self.optimizer = kwargs['optimizer']
if 'reg_term' in kwargs.keys():
self.reg_term = kwargs['reg_term']
if 'use_reg' in kwargs.keys():
self.use_reg = kwargs['use_reg']
if 'log_dir' in kwargs.keys():
self.log_dir = kwargs['log_dir']
if 'log_weight' in kwargs.keys():
self.log_weight = kwargs['log_weight']
if 'log_weight_target' in kwargs.keys():
self.log_weight_target = kwargs['log_weight_target']
if 'log_bias' in kwargs.keys():
self.log_bias = kwargs['log_bias']
if 'log_bias_target' in kwargs.keys():
self.log_bias_target = kwargs['log_bias_target']
if 'run_name' in kwargs.keys():
self.run_name = kwargs['run_name']
if 'log_metadata' in kwargs.keys():
self.log_metadata = kwargs['log_metadata']
if 'log_metadata_frequency' in kwargs.keys():
self.log_metadata_frequency = kwargs['log_metadata_frequency']
self.setup_parameters()
def make_computation_graph(self):
self.setup_parameters()
self.setup_variables()
self.setup_supervised_data()
self.allocate_data()
self.setup_layers(type='training')
self.setup_layers(type='test')
self.setup_layers(type='onetime')
self.setup_loss(type='training')
self.setup_loss(type='test')
self.setup_trainer()
self.setup_summary_for_tensorboard()
def setup_parameters(self):
"""
Calculate dependent parameters
"""
self.true_function = self.nn_as_function
self.number_of_function_parameters['input'] = self.number_of_input_units()
self.number_of_function_parameters['output'] = self.number_of_output_units()
self.supervised_data_all['input'] = np.ndarray([self.N_samples, self.number_of_function_parameters['input']])
self.supervised_data_all['output'] = np.ndarray([self.N_samples, self.number_of_function_parameters['output']])
self.number_of_data['training'] = math.floor(self.N_samples * self.training_ratio)
self.number_of_data['test'] = self.N_samples - self.number_of_data['training']
self.data['training']['input'] = self.supervised_data_all['input'][0:self.number_of_data['training']]
self.data['training']['output'] = self.supervised_data_all['output'][0:self.number_of_data['training']]
self.data['test']['input'] = self.supervised_data_all['input'][self.number_of_data['training']:]
self.data['test']['output'] = self.supervised_data_all['output'][self.number_of_data['training']:]
self.N_layers = len(self.N_neurons_in_layers)
self.layer_formation = [self.number_of_function_parameters['input'], *self.N_neurons_in_layers,
self.number_of_function_parameters['output']]
def setup_variables(self):
with tf.name_scope("Params-to-train"):
for i in range(self.N_layers):
with tf.name_scope("Bias-%s"%(i+1)):
with tf.device('/cpu:0'):
rand = tf.random_uniform([self.N_neurons_in_layers[i]])
self.b.append(tf.Variable(rand, dtype=tf.float32, name='biases-%s'%(i+1)))
with tf.name_scope("Weight-%s"%(i+1)):
if i == 0:
with tf.device('/cpu:0'):
rand = tf.random_uniform([self.number_of_function_parameters['input'], self.N_neurons_in_layers[i]])
self.W.append(tf.Variable(rand, dtype=tf.float32, name='weights-%s'%(i+1)))
else:
with tf.device('/cpu:0'):
rand = tf.random_uniform([self.N_neurons_in_layers[i - 1],self.N_neurons_in_layers[i]])
self.W.append(tf.Variable(rand, dtype=tf.float32, name='weights-%s'%(i+1)))
with tf.name_scope("Weight-%s"%(self.N_layers)):
with tf.device('/cpu:0'):
rand_W = tf.random_uniform([self.N_neurons_in_layers[self.N_layers - 1], self.number_of_function_parameters['output']])
self.W.append(tf.Variable(rand_W, dtype=tf.float32, name='weights-%s'%(self.N_layers+1)))
with tf.name_scope("Bias-%s"%(self.N_layers)):
with tf.device('/cpu:0'):
rand_b = tf.random_uniform([self.number_of_function_parameters['output']])
self.b.append(tf.Variable(rand_b, dtype=tf.float32, name='biases-%s'%(self.N_layers+1)))
with tf.device('/cpu:0'):
self.saver = tf.train.Saver(self.W + self.b)
def setup_supervised_data(self):
for i in range(self.N_samples):
self.supervised_data_all['input'][i],\
self.supervised_data_all['output'][i] = self.generate_data()
def setup_feed(data_type, io_type):
with tf.device('/cpu:0'):
self.data_feed[data_type]['entry'][io_type] = tf.placeholder(dtype=tf.float32, shape=self.data[data_type][io_type].shape)
self.data_feed[data_type]['data_set'][io_type] = tf.Variable(self.data_feed[data_type]['entry'][io_type], trainable=False, collections=[])
with tf.name_scope('Training-data'):
setup_feed('training', 'input')
setup_feed('training', 'output')
with tf.name_scope('Test-data'):
setup_feed('test', 'input')
setup_feed('test', 'output')
def allocate_data(self):
with tf.device('/cpu:0'):
self.session.run(self.data_feed['training']['data_set']['input'].initializer,
feed_dict={self.data_feed['training']['entry']['input']: self.data['training']['input']})
self.session.run(self.data_feed['training']['data_set']['output'].initializer,
feed_dict={self.data_feed['training']['entry']['output']: self.data['training']['output']})
self.session.run(self.data_feed['test']['data_set']['input'].initializer,
feed_dict={self.data_feed['test']['entry']['input']: self.data['test']['input']})
self.session.run(self.data_feed['test']['data_set']['output'].initializer,
feed_dict={self.data_feed['test']['entry']['output']: self.data['test']['output']})
with tf.name_scope('Epoch-wide-data-training'):
self.data_feed['training']['data_over_entire_epoch']['input'],\
self.data_feed['training']['data_over_entire_epoch']['output'] = \
tf.train.slice_input_producer([self.data_feed['training']['data_set']['input'],
self.data_feed['training']['data_set']['output']], shuffle=True, num_epochs=self.total_epochs)
with tf.name_scope('Epoch-wide-data-test'):
self.data_feed['test']['data_over_entire_epoch']['input'],\
self.data_feed['test']['data_over_entire_epoch']['output'] = \
tf.train.slice_input_producer([self.data_feed['test']['data_set']['input'],
self.data_feed['test']['data_set']['output']], shuffle=True, num_epochs=self.total_epochs)
self.session.run(tf.local_variables_initializer())
# Which device (CPU,GPU) runs the code below makes significant difference on performance.
# I expect that if we put the code below on GPU, the following process may occur.
# (1) To slice data into chuck of batch size on 'GPU', first transfer data covering entire epochs from CPU controlling RAM.
# (in this case `self.data_feed[xxxx]['data_over_entire_epoch'][xxxx]` placed on CPU).
# (2) Then extract necessary part of the entire data on GPU.
# (3) Repeat this cycle for every single training step(!!).
# Apparently, the cost to transfer whole data covering entire epoch should be huge enough to degrade performance badly.
#
# If we put the code below on CPU:
# (1) Slice data from entire epoch data on CPU. No data transfer is necessary at this point.
# (2) Send the batch size data to GPU. The data would be extremely small compared to entire epoch data.
# (3) Repeat this cycle for every single training step. Nothing to worry about performance here.
with tf.name_scope('Input-Layer-training'):
self.data_feed['training']['data_batch']['input'], self.data_feed['training']['data_batch']['output'] = tf.train.batch([self.data_feed['training']['data_over_entire_epoch']['input'], self.data_feed['training']['data_over_entire_epoch']['output']], batch_size=self.batch_size)
with tf.name_scope('Input-Layer-test'):
self.data_feed['test']['data_batch']['input'], self.data_feed['test']['data_batch']['output'] = tf.train.batch([self.data_feed['test']['data_over_entire_epoch']['input'], self.data_feed['test']['data_over_entire_epoch']['output']], batch_size=self.number_of_data['test'])
def setup_layers(self, type: str = 'training'):
if type == 'training':
input_batch = self.data_feed['training']['data_batch']['input']
elif type == 'test':
input_batch = self.data_feed['test']['data_batch']['input']
else:
with tf.device('/cpu:0'):
with tf.name_scope('Validation-Input'):
input_batch = tf.placeholder(dtype=tf.float32, shape=[1, self.number_of_function_parameters['input']], name='input')
with tf.name_scope("Hidden-Layers-%s" % type):
layer = []
for i in range(self.N_layers):
with tf.name_scope('Hidden-Layer-%s' % (i+1)):
if i == 0:
z = tf.matmul(input_batch, self.W[i]) + self.b[i]
a = self.activation(z)
else:
z = tf.matmul(layer[i - 1], self.W[i]) + self.b[i]
a = self.activation(z)
layer.append(a)
with tf.name_scope("Output-Layer-%s" % type):
z = tf.matmul(layer[self.N_layers - 1], self.W[self.N_layers]) + self.b[self.N_layers]
if self.classification_type == 'mlc':
a = tf.sigmoid(z)
elif self.classification_type == 'mcc':
# Normally, a = tf.nn.softmax(z) is natural in this context,
# but for performance issue, don't make softmax right here.
# For detail, please see this post.
# https://github.com/tensorflow/tensorflow/issues/2462
a = z
else:
a = z
if type == 'training':
self.output['training'] = a
elif type == 'test':
self.output['test'] = a
else:
if self.classification_type == 'mcc':
self.output['normal'] = tf.nn.softmax(a)
else:
self.output['normal'] = a
def setup_loss(self, type:str = 'training'):
if type != 'training' and type != 'test':
return
computed_output = self.output[type]
true_output = self.data_feed[type]['data_batch']['output']
with tf.name_scope("Loss-%s" % type):
loss_crossentropy = None
if self.classification_type in ['mcc', 'mlc']:
loss_crossentropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=computed_output, labels=true_output))
self.loss[type]['ce'] = loss_crossentropy
if self.classification_type == 'mcc':
correct_prediction = tf.equal(tf.argmax(computed_output, 1), tf.argmax(true_output, 1))
self.accuracy[type] = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) * 100.0
else:
correct_prediction = tf.equal(tf.round(computed_output), true_output)
self.accuracy[type] = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) * 100.0
if type == 'training':
self.loss_target = loss_crossentropy
else:
loss_mse = tf.reduce_mean(tf.square(computed_output - true_output))
loss_mae = tf.reduce_mean(tf.abs(computed_output - true_output))
loss_mape = tf.reduce_mean(tf.abs(computed_output / true_output - 1.0)) * 100.0
self.loss[type]['mse'] = loss_mse
self.loss[type]['mae'] = loss_mae
self.loss[type]['mape'] = loss_mape
if type == 'training':
self.loss_target = loss_mse
if type == 'training':
if self.use_reg is not None:
with tf.name_scope("Regularization"):
reg = tf.constant(0, dtype=tf.float32)
for i in range(self.N_layers):
if self.use_reg == 'L1':
reg = reg + tf.reduce_mean(tf.abs(self.W[i]))
elif self.use_reg == 'L2':
reg = reg + tf.reduce_mean(tf.square(self.W[i]))
reg = reg * self.reg_term
self.loss_target = self.loss_target + reg
def setup_trainer(self):
with tf.name_scope("Train"):
if self.optimizer == "SGD":
self.train_op = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.loss_target)
elif self.optimizer == "Adam":
self.train_op = tf.train.AdamOptimizer(self.learning_rate).minimize(self.loss_target)
self.session.run(tf.global_variables_initializer())
def setup_summary_for_tensorboard(self):
with tf.device('/cpu:0'):
with tf.name_scope("Summary"):
# Clear logdir if it already exists
#if tf.gfile.Exists(self.log_dir):
# tf.gfile.DeleteRecursively(self.log_dir)
self.writer = tf.summary.FileWriter(self.log_dir + '/' + self.run_name, self.session.graph)
if self.classification_type in ['mcc', 'mlc']:
ce_training = tf.summary.scalar('CrossEntropy(Training)', self.loss['training']['ce'])
acc_training = tf.summary.scalar('Accuracy(Training)', self.accuracy['training'])
self.merged_train = tf.summary.merge([ce_training, acc_training])
ce_test = tf.summary.scalar('CrossEntropy(Test)', self.loss['test']['ce'])
acc_test = tf.summary.scalar('Accuracy(Test)', self.accuracy['test'])
self.merged_test = tf.summary.merge([ce_test, acc_test])
else:
mse_training = tf.summary.scalar('MSE(Training)', self.loss['training']['mse'])
mae_training = tf.summary.scalar('MAE(Training)', self.loss['training']['mae'])
mape_training = tf.summary.scalar('MAPE(Training)', self.loss['training']['mape'])
self.merged_train = tf.summary.merge([mse_training, mae_training, mape_training])
mse_test = tf.summary.scalar('MSE(Test)', self.loss['test']['mse'])
mae_test = tf.summary.scalar('MAE(Test)', self.loss['test']['mae'])
mape_test = tf.summary.scalar('MAPE(Test)', self.loss['test']['mape'])
self.merged_test = tf.summary.merge([mse_test, mae_test, mape_test])
if self.log_weight:
with tf.name_scope("Weights"):
w_summaries = []
layer = 0
for w in self.W:
rows, columns = w.get_shape().as_list()
for c in range(columns):
for r in range(rows):
if self.log_weight_target is not None and (layer+1) in self.log_weight_target:
smry = tf.summary.scalar('W(%d,%d,%d)'%(layer + 1, c, r), self.W[layer][r, c])
w_summaries.append(smry)
layer += 1
self.merged_weight = tf.summary.merge(w_summaries)
if self.log_bias:
with tf.name_scope("Bias"):
b_summaries = []
layer = 0
for b in self.b:
columns = b.get_shape().as_list()
for c in range(*columns):
if self.log_bias_target is not None and (layer+1) in self.log_bias_target:
smry = tf.summary.scalar('b(%d,%d)' % (layer + 1, c), self.b[layer][c])
b_summaries.append(smry)
layer += 1
self.merged_bias = tf.summary.merge(b_summaries)
def read_training_vars(self, filepath: str):
self.saver.restore(self.session, filepath)
print('Read saved parameters from file: %s' % filepath)
def train(self):
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=self.session, coord=coord)
try:
if self.classification_type in ['mcc', 'mlc']:
self.__train_for_mcc_mlc(coord)
else:
self.__train_except_for_mcc_mlc(coord)
except tf.errors.OutOfRangeError:
print('Done training for %d epochs' % (self.total_epochs))
finally:
coord.request_stop()
coord.join(threads)
def save_training_vars(self, filepath: str):
with tf.device('/cpu:0'):
save_path = self.saver.save(self.session, filepath)
print('Trained parameters have been saved in file: %s' % filepath)
def __train_for_mcc_mlc(self, coord: tf.train.Coordinator):
print('%s %s %s %s %s %s %s %s %s' % (
'Epoch'.ljust(6), 'Step'.ljust(6), 'Cross Entropy(training)'.ljust(25), 'Acc(training)'.ljust(15),
'Cross Entropy(test)'.ljust(25), 'Acc(test)'.ljust(15), 'Sec'.ljust(8),
'Epoch/Sec'.ljust(10), 'Step/Sec'.ljust(10)))
start_time = time.time()
step = 0
current_epoch = 0
ce_tr = 0
acc_tr = 0
steps_per_epoch = self.number_of_data['training'] / self.batch_size
while not coord.should_stop():
# Record run stats for every 100 epochs
if self.log_metadata and ((step+1) * self.batch_size / self.number_of_data['training']) % self.log_metadata_frequency == 0:
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
run_metadata = tf.RunMetadata()
_, ce, acc, summary_train = self.session.run([self.train_op, self.loss['training']['ce'],
self.accuracy['training'], self.merged_train], options=run_options, run_metadata=run_metadata)
self.writer.add_run_metadata(run_metadata, 'epoch%d-(train)' % (current_epoch+1))
if True and ((step+1) * self.batch_size / self.number_of_data['training']) == self.log_metadata_frequency:
self.log_timeline(run_metadata)
else:
_, ce, acc, summary_train = self.session.run([self.train_op, self.loss['training']['ce'],
self.accuracy['training'], self.merged_train])
step += 1
ce_tr += ce
acc_tr += acc
if (step * self.batch_size / self.number_of_data['training']) % 10 == 0:
print('%s %s %s %s %s %s %s %s %s' % (
'Epoch'.ljust(6), 'Step'.ljust(6), 'Cross Entropy(training)'.ljust(25), 'Acc(training)'.ljust(15),
'Cross Entropy(test)'.ljust(25), 'Acc(test)'.ljust(15), 'Sec'.ljust(8),
'Epoch/Sec'.ljust(10), 'Step/Sec'.ljust(10)))
self.writer.flush()
if (step * self.batch_size) % self.number_of_data['training'] == 0:
current_epoch += 1
if self.log_metadata and (current_epoch - 1) % 100 == 0:
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
run_metadata = tf.RunMetadata()
ce, acc, summary_test = self.session.run([self.loss['test']['ce'], self.accuracy['test'],
self.merged_test], options=run_options, run_metadata=run_metadata)
self.writer.add_run_metadata(run_metadata, 'epoch%d-(test)' % (current_epoch - 1))
else:
ce, acc, summary_test = self.session.run(
[self.loss['test']['ce'], self.accuracy['test'], self.merged_test])
ce_ts = ce
acc_ts = acc
ce_tr = ce_tr / steps_per_epoch
acc_tr = acc_tr / steps_per_epoch
interval = time.time() - start_time
epoch_per_time = current_epoch / interval
step_per_time = step / interval
print('%s %s %s %s %s %s %s %s %s' % (
('%d' % (current_epoch)).ljust(6), ('%d' % (step)).ljust(6), ('%.6f' % ce_tr).ljust(25),
('%.2f%%' % acc_tr).ljust(15), ('%.6f' % ce_ts).ljust(25), ('%.2f%%' % acc_ts).ljust(15),
('%.2f' % interval).ljust(8), ('%.2f' % epoch_per_time).ljust(10), ('%.2f' % step_per_time).ljust(10)))
self.writer.add_summary(summary_train, step)
self.writer.add_summary(summary_test, step)
if self.log_weight:
summary_w = self.session.run(self.merged_weight)
self.writer.add_summary(summary_w, step)
if self.log_bias:
summary_b = self.session.run(self.merged_bias)
self.writer.add_summary(summary_b, step)
ce_tr = 0
def __train_except_for_mcc_mlc(self, coord: tf.train.Coordinator):
print('%s %s %s %s %s %s %s %s %s %s %s' % (
'Epoch'.ljust(6), 'Step'.ljust(6), 'MSE(training)'.ljust(15), 'MAE(training)'.ljust(15),
'MAPE(training)'.ljust(15),
'MSE(test)'.ljust(15), 'MAE(test)'.ljust(15), 'MAPE(test)'.ljust(15), 'Sec'.ljust(8),
'Epoch/Sec'.ljust(10), 'Step/Sec'.ljust(10)))
start_time = time.time()
step = 0
current_epoch = 0
mae_tr = 0
mse_tr = 0
mape_tr = 0
steps_per_epoch = self.number_of_data['training'] / self.batch_size
while not coord.should_stop():
# Record run stats for every 100 epochs
if self.log_metadata and ((step+1) * self.batch_size / self.number_of_data['training']) % self.log_metadata_frequency == 0:
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
run_metadata = tf.RunMetadata()
_, mae, mse, mape, summary_train = self.session.run([self.train_op, self.loss['training']['mae'],
self.loss['training']['mse'],
self.loss['training']['mape'],
self.merged_train], options=run_options,
run_metadata=run_metadata)
self.writer.add_run_metadata(run_metadata, 'epoch%d-(train)' % (current_epoch+1))
if True and ((step+1) * self.batch_size / self.number_of_data['training']) == self.log_metadata_frequency:
self.log_timeline(run_metadata)
else:
_, mae, mse, mape, summary_train = self.session.run([self.train_op, self.loss['training']['mae'],
self.loss['training']['mse'],
self.loss['training']['mape'],
self.merged_train])
step += 1
mae_tr += mae
mse_tr += mse
mape_tr += mape
if (step * self.batch_size / self.number_of_data['training']) % 10 == 0: # Every 10 epochs
print('%s %s %s %s %s %s %s %s %s %s %s' % (
'Epoch'.ljust(6), 'Step'.ljust(6), 'MSE(training)'.ljust(15), 'MAE(training)'.ljust(15),
'MAPE(training)'.ljust(15),
'MSE(test)'.ljust(15), 'MAE(test)'.ljust(15), 'MAPE(test)'.ljust(15), 'Sec'.ljust(8),
'Epoch/Sec'.ljust(10), 'Step/Sec'.ljust(10)))
self.writer.flush()
if (step * self.batch_size) % self.number_of_data['training'] == 0: # Every epoch
current_epoch += 1
if self.log_metadata and (current_epoch - 1) % 100 == 0:
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
run_metadata = tf.RunMetadata()
mae, mse, mape, summary_test = self.session.run(
[self.loss['test']['mae'], self.loss['test']['mse'],
self.loss['test']['mape'], self.merged_test], options=run_options, run_metadata=run_metadata)
self.writer.add_run_metadata(run_metadata, 'epoch%d-(test)' % (current_epoch-1))
self.writer.add_summary(summary_test, step)
else:
mae, mse, mape, summary_test = self.session.run(
[self.loss['test']['mae'], self.loss['test']['mse'], self.loss['test']['mape'], self.merged_test])
self.writer.add_summary(summary_train, step)
self.writer.add_summary(summary_test, step)
mae_ts = mae
mse_ts = mse
mape_ts = mape
interval = time.time() - start_time
epoch_per_time = current_epoch / interval
step_per_time = step / interval
mae_tr = mae_tr / steps_per_epoch
mse_tr = mse_tr / steps_per_epoch
mape_tr = mape_tr / steps_per_epoch
print('%s %s %s %s %s %s %s %s %s %s %s' % (
('%d' % (current_epoch)).ljust(6), ('%d' % (step)).ljust(6), ('%.6f' % mse_tr).ljust(15),
('%.6f' % mae_tr).ljust(15), ('%.6f' % mape_tr).ljust(15), ('%.6f' % mse_ts).ljust(15),
('%.6f' % mae_ts).ljust(15), ('%.6f' % mape_ts).ljust(15), ('%.2f' % interval).ljust(8),
('%.2f' % epoch_per_time).ljust(10), ('%.2f' % step_per_time).ljust(10)))
if self.log_weight:
summary_w = self.session.run(self.merged_weight)
self.writer.add_summary(summary_w, step)
if self.log_bias:
summary_b = self.session.run(self.merged_bias)
self.writer.add_summary(summary_b, step)
mae_tr = 0
mse_tr = 0
mape_tr = 0
def generate_data(self):
inputs = np.ndarray([self.number_of_function_parameters['input']])
for j in range(len(inputs)):
inputs[j] = (np.random.randn(1).astype(np.float32)[0] - 0.5) * 2.0
outputs = self.true_function(*inputs)
return [inputs, outputs]
def get_output(self, input: list = None):
if input is not None:
input_size = len(input)
input = np.array(input).reshape([input_size, 1])
input_list = self.flatten_2d_list(input.tolist())
true_output = self.true_function(*input_list)
true_output = np.array(true_output)
else:
input, true_output = self.generate_data()
input = input.reshape([1, -1])
input_list = self.flatten_2d_list(input.tolist())
computed_output = self.session.run(self.output['normal'], feed_dict={'Validation-Input/input:0': input})
computed_output = self.flatten_2d_list(computed_output.tolist())
computed_output = np.array(computed_output)
if self.classification_type == 'mcc':
# Accuracy
estimater = np.equal(np.argmax(computed_output, 0), np.argmax(true_output, 0))
elif self.classification_type == 'mlc':
estimater = np.mean((np.equal(np.around(computed_output), np.around(true_output))[0]).astype(float)) == 1.0
else :
# MAPE
estimater = (np.mean(np.abs(np.divide(computed_output, true_output) - 1.0)) * 100.0)
print('%s: %s\n%s: %s\n%s: %s\n%s: %s' % (
'Input',
('%s' % ', '.join('%.6f' % e for e in input_list)).ljust(15),
'Output(true)',
('%s' % ', '.join('%.6f' % e for e in true_output)).ljust(15),
'Output(calc)',
('%s' % ', '.join('%.6f' % e for e in computed_output)).ljust(16),
'Result' if self.classification_type in ['mlc', 'mcc'] else 'MAPE',
('%s' % estimater) if self.classification_type in ['mlc', 'mcc'] else ('%.2f%%' % estimater)))
return [input_list, true_output, computed_output, (0 if estimater else 100) if self.classification_type in ['mcc','mlc'] else estimater]
def print_config(self):
print('============ Information ============')
print('Layer formation: %s' % ('-'.join(str(e) for e in self.layer_formation)))
print('Number of whole samples: %d' % self.N_samples)
print('Number of training samples: %d' % self.number_of_data['training'])
print('Number of test samples: %d' % self.number_of_data['test'])
print('Batch size: %d' % self.batch_size)
print('Total epoch: %d' % self.total_epochs)
print('Step per an epoch: %d' % (self.number_of_data['training'] / self.batch_size))
print('Learning rate: %f' % self.learning_rate)
print('Regularization: %s' % self.use_reg)
print('Regularization term: %f' % self.reg_term)
print('Log dir: %s' % self.log_dir)
print()
def number_of_input_units(self):
return len(signature(self.nn_as_function).parameters)
def number_of_output_units(self):
return len(signature(self.nn_as_function).return_annotation)
def log_timeline(self, run_metadata):
step_stats = run_metadata.step_stats
tl = timeline.Timeline(step_stats)
ctf = tl.generate_chrome_trace_format(show_memory=False,
show_dataflow=True)
with open(self.log_dir + "/timeline.json", "w") as f:
f.write(ctf)
@staticmethod
def flatten_2d_list(l):
return sum(l, [])
def sin(x) -> [float]:
return [math.sin(x)]
def janken(guu, choki, paa) -> [float, float, float]:
mx = max([guu, choki, paa])
if guu == mx: return [0, 0, 1]
if choki == mx: return [1, 0, 0]
if paa == mx: return [0, 1, 0]
conf = {
'neurons': [20, 20, 20],
'optimizer': 'Adam',
'total_epochs': 2000,
'n_samples': 1000,
'training_ratio': 0.5,
'batch_size': 20,
#'activation': tf.nn.relu,
'learning_rate': 0.01,
'use_reg': 'L1',
'reg_term': 1.0,
'classification_type': 'mcc',
'log_weight': False,
'log_weight_target': [1],
#'nn_as_function': janken,
#'nn_as_function': sin,
'nn_as_function': Distributions(8).get_function(),
'log_dir': '/tmp/test/summary',
'log_metadata': True
}
def main(*, test_only=False, model_path):
if test_only:
test_network(model_path)
else:
train_network(model_path)
def train_network(model_path):
mlp = MLP()
mlp.configure(**conf)
mlp.make_computation_graph()
start_time = time.time()
mlp.train()
print("Elapsed time: {} sec".format(int(time.time() - start_time)))
print()
mlp.save_training_vars(model_path)
mlp.print_config()
print('======Let\'s see how network works====')
_, _1, _2, err = mlp.get_output()
if err < 0.1:
print('Wow! Great result!!')
elif err < 3.0:
print('Seems working good!')
elif err < 10.0:
print('Maybe need a bit more training')
else:
print('Something went wrong...')
def test_network(model_path):
mlp = MLP()
mlp.configure(**conf)
mlp.make_computation_graph()
mlp.read_training_vars(model_path)
for i in range(10):
mlp.get_output()
print()
if __name__ == "__main__":
main(test_only=False, model_path='/tmp/test/janken.ckpt')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment