Last active
April 15, 2017 20:43
-
-
Save Hinaser/6eb8ea4fc9c11316753d8ba9ad398f5c to your computer and use it in GitHub Desktop.
MLP sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Learn arbitrary function from training data | |
Architecture: MLP | |
Number of hidden layers: You can configure via `N_neurons_in_layers` | |
""" | |
import math | |
import time | |
from inspect import signature | |
import numpy as np | |
import tensorflow as tf | |
""" | |
List of functions to learn | |
""" | |
class FunctionCollection: | |
def __init__(self, n): | |
self.func = eval('self.%s%s'%('sample_func', n)) | |
def __call__(self, n): | |
self.func = eval('self.%s%s'%('sample_func', n)) | |
def number_of_arguments(self): | |
return len(signature(self.func).parameters) | |
def number_of_output(self): | |
return len(signature(self.func).return_annotation) | |
def get_func(self): | |
return self.func | |
@staticmethod | |
def sample_func1(x: float, y: float) -> [float]: | |
return [x * y * math.sin(x + y)] | |
@staticmethod | |
def sample_func2(x: float, y: float) -> [float]: | |
return [0.2 + 0.4 * x * y + 0.3 * x * math.sin(15 * y) + 0.05 * math.cos(50 * x)] | |
@staticmethod | |
def sample_func3(x: float, y: float) -> [float]: | |
return [3.0] | |
@staticmethod | |
def sample_func4(x: float, y: float) -> [float]: | |
if x >= 0: | |
if y >= 0: | |
return [1] | |
else: | |
return [-1] | |
else: | |
if y >= 0: | |
return [-1] | |
else: | |
return [1] | |
@staticmethod | |
def sample_func5(x:float, y: float) -> [float, float]: | |
return [x + y, x**2-y**2] | |
@staticmethod | |
def sample_func6(x:float) -> [float]: | |
return [math.sin(x)] | |
class MLP: | |
def __init__(self): | |
""" | |
Configurable dnn parameters | |
""" | |
self.N_neurons_in_layers = [10, 10] | |
self.N_samples = 1000 | |
self.training_ratio = 0.05 | |
self.total_epochs = 500 | |
self.learning_rate = 0.01 | |
self.activation = tf.nn.tanh | |
self.target_function_index = 6 | |
self.log_dir = '/tmp/test' | |
""" | |
Parameters whose values are decided after config has done | |
""" | |
self.session = tf.InteractiveSession() | |
self.f_collection = None | |
self.target_func = None | |
self.N_input_parameters = None | |
self.N_output_parameters = None | |
self.supervised_data_input = None | |
self.supervised_data_output = None | |
self.N_training_data = None | |
self.N_test_data = None | |
self.training_data_input = None | |
self.training_data_output = None | |
self.test_data_input = None | |
self.test_data_output = None | |
self.N_layers = None | |
self.W = [] | |
self.b = [] | |
self.input_data = None | |
self.output_data = None | |
self.output = None | |
self.loss_mae = None | |
self.loss_mse = None | |
self.loss_mape = None | |
self.trainer = None | |
self.merged_train = None | |
self.merged_test = None | |
self.writer = None | |
def configure(self, *, neurons: list=None, n_samples: int=None, training_ratio: float=None, | |
total_epochs: int=None, learning_rate: float=None, activation=None, | |
target_function_index: int=None, log_dir: str=None): | |
if neurons is not None: | |
self.N_neurons_in_layers = neurons | |
if n_samples is not None: | |
self.N_samples = n_samples | |
if training_ratio is not None: | |
self.training_ratio = training_ratio | |
if total_epochs is not None: | |
self.total_epochs = total_epochs | |
if learning_rate is not None: | |
self.learning_rate = learning_rate | |
if activation is not None: | |
self.activation = activation | |
if target_function_index is not None: | |
self.target_function_index = target_function_index | |
if log_dir is not None: | |
self.log_dir = log_dir | |
self.setup_parameters() | |
def setup_parameters(self): | |
""" | |
Calculate dependent parameters | |
""" | |
self.f_collection = FunctionCollection(self.target_function_index) | |
self.target_func = self.f_collection.get_func() | |
self.N_input_parameters = self.f_collection.number_of_arguments() | |
self.N_output_parameters = self.f_collection.number_of_output() | |
self.supervised_data_input = np.ndarray([self.N_samples, self.N_input_parameters, 1]) | |
self.supervised_data_output = np.ndarray([self.N_samples, self.N_output_parameters, 1]) | |
self.N_training_data = math.floor(self.N_samples * self.training_ratio) | |
self.N_test_data = self.N_samples - self.N_training_data | |
self.training_data_input = self.supervised_data_input[0:self.N_training_data] | |
self.training_data_output = self.supervised_data_output[0:self.N_training_data] | |
self.test_data_input = self.supervised_data_input[self.N_training_data:] | |
self.test_data_output = self.supervised_data_output[self.N_training_data:] | |
self.N_layers = len(self.N_neurons_in_layers) | |
def setup_variables(self): | |
with tf.name_scope("Variables"): | |
for i in range(self.N_layers): | |
self.b.append(tf.Variable(tf.random_uniform([self.N_neurons_in_layers[i], 1]), dtype=tf.float32)) | |
if i == 0: | |
self.W.append( | |
tf.Variable(tf.random_uniform([self.N_neurons_in_layers[i], self.N_input_parameters]), | |
dtype=tf.float32)) | |
else: | |
self.W.append(tf.Variable(tf.random_uniform([self.N_neurons_in_layers[i], | |
self.N_neurons_in_layers[i - 1]]), | |
dtype=tf.float32)) | |
self.W.append(tf.Variable(tf.random_uniform([self.N_output_parameters, | |
self.N_neurons_in_layers[self.N_layers - 1]]), | |
dtype=tf.float32)) | |
self.b.append(tf.Variable(tf.random_uniform([self.N_output_parameters, 1]), dtype=tf.float32)) | |
self.session.run(tf.global_variables_initializer()) | |
def setup_placeholders(self): | |
with tf.name_scope("Input-Layer"): | |
self.input_data = tf.placeholder(tf.float32, shape=[self.N_input_parameters, 1]) | |
with tf.name_scope("True-Output"): | |
self.output_data = tf.placeholder(tf.float32, shape=[self.N_output_parameters, 1]) | |
def setup_layers(self): | |
with tf.name_scope("Hidden-Layers"): | |
layer = [] | |
z = None | |
a = None | |
for i in range(self.N_layers): | |
if i == 0: | |
z = tf.matmul(self.W[i], self.input_data) + self.b[i] | |
a = self.activation(z) | |
else: | |
z = tf.matmul(self.W[i], layer[i - 1]) + self.b[i] | |
a = self.activation(z) | |
layer.append(a) | |
with tf.name_scope("Output-Layer"): | |
z = tf.matmul(self.W[self.N_layers], layer[self.N_layers - 1]) + self.b[self.N_layers] | |
a = z | |
self.output = a | |
def setup_loss(self): | |
with tf.name_scope("Loss"): | |
self.loss_mse = tf.reduce_mean(tf.square(self.output - self.output_data)) | |
self.loss_mae = tf.reduce_mean(tf.abs(self.output - self.output_data)) | |
self.loss_mape = tf.reduce_mean(tf.abs(self.output / self.output_data - 1.0)) * 100.0 | |
def setup_trainer(self): | |
with tf.name_scope("Train"): | |
self.trainer = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.loss_mse) | |
def setup_summary_for_tensorboard(self): | |
with tf.name_scope("Summary"): | |
# Clear logdir if it already exists | |
if tf.gfile.Exists(self.log_dir): | |
tf.gfile.DeleteRecursively(self.log_dir) | |
self.writer = tf.summary.FileWriter(self.log_dir, self.session.graph) | |
mse_training = tf.summary.scalar('MSE(Training)', self.loss_mse) | |
mae_training = tf.summary.scalar('MAE(Training)', self.loss_mae) | |
mape_training = tf.summary.scalar('MAPE(Training)', self.loss_mape) | |
self.merged_train = tf.summary.merge([mse_training, mae_training, mape_training]) | |
mse_test = tf.summary.scalar('MSE(Test)', self.loss_mse) | |
mae_test = tf.summary.scalar('MAE(Test)', self.loss_mae) | |
mape_test = tf.summary.scalar('MAPE(Test)', self.loss_mape) | |
self.merged_test = tf.summary.merge([mse_test, mae_test, mape_test]) | |
def prepare_supervised_data(self): | |
for i in range(self.N_samples): | |
self.supervised_data_input[i], self.supervised_data_output[i] = self.generate_data() | |
def train(self): | |
for epoch in range(self.total_epochs): | |
mae_tr = 0 | |
mse_tr = 0 | |
mape_tr = 0 | |
mae_ts = 0 | |
mse_ts = 0 | |
mape_ts = 0 | |
for step in range(self.N_training_data): | |
_, mae, mse, mape = self.session.run([self.trainer, self.loss_mae, self.loss_mse, self.loss_mape], | |
feed_dict={self.input_data: self.training_data_input[step], | |
self.output_data: self.training_data_output[step]}) | |
mae_tr += mae | |
mse_tr += mse | |
mape_tr += mape | |
if self.merged_train is not None: | |
summary = self.session.run(self.merged_train, | |
feed_dict={self.input_data: self.training_data_input[step], | |
self.output_data: self.training_data_output[step]}) | |
self.writer.add_summary(summary, epoch) | |
mae_tr = mae_tr / self.N_training_data | |
mse_tr = mse_tr / self.N_training_data | |
mape_tr = mape_tr / self.N_training_data | |
for step in range(self.N_test_data): | |
mae, mse, mape = self.session.run([self.loss_mae, self.loss_mse, self.loss_mape], | |
feed_dict={self.input_data: self.test_data_input[step], | |
self.output_data: self.test_data_output[step]}) | |
mae_ts += mae | |
mse_ts += mse | |
mape_ts += mape | |
if self.merged_test is not None: | |
summary = self.session.run(self.merged_test, | |
feed_dict={self.input_data: self.test_data_input[step], | |
self.output_data: self.test_data_output[step]}) | |
self.writer.add_summary(summary, epoch) | |
mae_ts = mae_ts / self.N_test_data | |
mse_ts = mse_ts / self.N_test_data | |
mape_ts = mape_ts / self.N_test_data | |
if epoch % 10 == 0: | |
print('%s %s %s %s %s %s %s' % ( | |
'Epoch'.ljust(6), 'MSE(training)'.ljust(15), 'MAE(training)'.ljust(15), 'MAPE(training)'.ljust(16), | |
'MSE(test)'.ljust(15), 'MAE(test)'.ljust(15), 'MAPE(test)'.ljust(15))) | |
self.writer.flush() | |
print('%s %s %s %s %s %s %s' % ( | |
('%d' % (epoch + 1)).ljust(6), ('%.6f' % mse_tr).ljust(15), ('%.6f' % mae_tr).ljust(15), | |
('%.6f' % mape_tr).ljust(16), | |
('%.6f' % mse_ts).ljust(15), ('%.6f' % mae_ts).ljust(15), ('%.6f' % mape_ts).ljust(16))) | |
def generate_data(self): | |
inputs = np.ndarray([self.N_input_parameters, 1]) | |
for j in range(len(inputs)): | |
inputs[j] = (np.random.rand() - 0.5) * 2.0 | |
outputs = self.target_func(*inputs) | |
return [inputs, outputs] | |
def get_output(self, input): | |
true_output = self.target_func(*sum(input, [])) | |
computed_output = self.session.run(self.output, feed_dict={self.input_data: input}) | |
return [input, true_output, computed_output] | |
def main(): | |
mlp = MLP() | |
mlp.setup_parameters() | |
mlp.setup_variables() | |
mlp.setup_placeholders() | |
mlp.setup_layers() | |
mlp.setup_loss() | |
mlp.setup_trainer() | |
mlp.setup_summary_for_tensorboard() | |
mlp.prepare_supervised_data() | |
start_time = time.time() | |
mlp.train() | |
interval = int(time.time() - start_time) | |
print("Elapsed time: {} sec".format(interval)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment