Skip to content

Instantly share code, notes, and snippets.

@Hinaser
Last active April 15, 2017 20:43
Show Gist options
  • Save Hinaser/6eb8ea4fc9c11316753d8ba9ad398f5c to your computer and use it in GitHub Desktop.
Save Hinaser/6eb8ea4fc9c11316753d8ba9ad398f5c to your computer and use it in GitHub Desktop.
MLP sample
"""
Learn arbitrary function from training data
Architecture: MLP
Number of hidden layers: You can configure via `N_neurons_in_layers`
"""
import math
import time
from inspect import signature
import numpy as np
import tensorflow as tf
"""
List of functions to learn
"""
class FunctionCollection:
def __init__(self, n):
self.func = eval('self.%s%s'%('sample_func', n))
def __call__(self, n):
self.func = eval('self.%s%s'%('sample_func', n))
def number_of_arguments(self):
return len(signature(self.func).parameters)
def number_of_output(self):
return len(signature(self.func).return_annotation)
def get_func(self):
return self.func
@staticmethod
def sample_func1(x: float, y: float) -> [float]:
return [x * y * math.sin(x + y)]
@staticmethod
def sample_func2(x: float, y: float) -> [float]:
return [0.2 + 0.4 * x * y + 0.3 * x * math.sin(15 * y) + 0.05 * math.cos(50 * x)]
@staticmethod
def sample_func3(x: float, y: float) -> [float]:
return [3.0]
@staticmethod
def sample_func4(x: float, y: float) -> [float]:
if x >= 0:
if y >= 0:
return [1]
else:
return [-1]
else:
if y >= 0:
return [-1]
else:
return [1]
@staticmethod
def sample_func5(x:float, y: float) -> [float, float]:
return [x + y, x**2-y**2]
@staticmethod
def sample_func6(x:float) -> [float]:
return [math.sin(x)]
class MLP:
def __init__(self):
"""
Configurable dnn parameters
"""
self.N_neurons_in_layers = [10, 10]
self.N_samples = 1000
self.training_ratio = 0.05
self.total_epochs = 500
self.learning_rate = 0.01
self.activation = tf.nn.tanh
self.target_function_index = 6
self.log_dir = '/tmp/test'
"""
Parameters whose values are decided after config has done
"""
self.session = tf.InteractiveSession()
self.f_collection = None
self.target_func = None
self.N_input_parameters = None
self.N_output_parameters = None
self.supervised_data_input = None
self.supervised_data_output = None
self.N_training_data = None
self.N_test_data = None
self.training_data_input = None
self.training_data_output = None
self.test_data_input = None
self.test_data_output = None
self.N_layers = None
self.W = []
self.b = []
self.input_data = None
self.output_data = None
self.output = None
self.loss_mae = None
self.loss_mse = None
self.loss_mape = None
self.trainer = None
self.merged_train = None
self.merged_test = None
self.writer = None
def configure(self, *, neurons: list=None, n_samples: int=None, training_ratio: float=None,
total_epochs: int=None, learning_rate: float=None, activation=None,
target_function_index: int=None, log_dir: str=None):
if neurons is not None:
self.N_neurons_in_layers = neurons
if n_samples is not None:
self.N_samples = n_samples
if training_ratio is not None:
self.training_ratio = training_ratio
if total_epochs is not None:
self.total_epochs = total_epochs
if learning_rate is not None:
self.learning_rate = learning_rate
if activation is not None:
self.activation = activation
if target_function_index is not None:
self.target_function_index = target_function_index
if log_dir is not None:
self.log_dir = log_dir
self.setup_parameters()
def setup_parameters(self):
"""
Calculate dependent parameters
"""
self.f_collection = FunctionCollection(self.target_function_index)
self.target_func = self.f_collection.get_func()
self.N_input_parameters = self.f_collection.number_of_arguments()
self.N_output_parameters = self.f_collection.number_of_output()
self.supervised_data_input = np.ndarray([self.N_samples, self.N_input_parameters, 1])
self.supervised_data_output = np.ndarray([self.N_samples, self.N_output_parameters, 1])
self.N_training_data = math.floor(self.N_samples * self.training_ratio)
self.N_test_data = self.N_samples - self.N_training_data
self.training_data_input = self.supervised_data_input[0:self.N_training_data]
self.training_data_output = self.supervised_data_output[0:self.N_training_data]
self.test_data_input = self.supervised_data_input[self.N_training_data:]
self.test_data_output = self.supervised_data_output[self.N_training_data:]
self.N_layers = len(self.N_neurons_in_layers)
def setup_variables(self):
with tf.name_scope("Variables"):
for i in range(self.N_layers):
self.b.append(tf.Variable(tf.random_uniform([self.N_neurons_in_layers[i], 1]), dtype=tf.float32))
if i == 0:
self.W.append(
tf.Variable(tf.random_uniform([self.N_neurons_in_layers[i], self.N_input_parameters]),
dtype=tf.float32))
else:
self.W.append(tf.Variable(tf.random_uniform([self.N_neurons_in_layers[i],
self.N_neurons_in_layers[i - 1]]),
dtype=tf.float32))
self.W.append(tf.Variable(tf.random_uniform([self.N_output_parameters,
self.N_neurons_in_layers[self.N_layers - 1]]),
dtype=tf.float32))
self.b.append(tf.Variable(tf.random_uniform([self.N_output_parameters, 1]), dtype=tf.float32))
self.session.run(tf.global_variables_initializer())
def setup_placeholders(self):
with tf.name_scope("Input-Layer"):
self.input_data = tf.placeholder(tf.float32, shape=[self.N_input_parameters, 1])
with tf.name_scope("True-Output"):
self.output_data = tf.placeholder(tf.float32, shape=[self.N_output_parameters, 1])
def setup_layers(self):
with tf.name_scope("Hidden-Layers"):
layer = []
z = None
a = None
for i in range(self.N_layers):
if i == 0:
z = tf.matmul(self.W[i], self.input_data) + self.b[i]
a = self.activation(z)
else:
z = tf.matmul(self.W[i], layer[i - 1]) + self.b[i]
a = self.activation(z)
layer.append(a)
with tf.name_scope("Output-Layer"):
z = tf.matmul(self.W[self.N_layers], layer[self.N_layers - 1]) + self.b[self.N_layers]
a = z
self.output = a
def setup_loss(self):
with tf.name_scope("Loss"):
self.loss_mse = tf.reduce_mean(tf.square(self.output - self.output_data))
self.loss_mae = tf.reduce_mean(tf.abs(self.output - self.output_data))
self.loss_mape = tf.reduce_mean(tf.abs(self.output / self.output_data - 1.0)) * 100.0
def setup_trainer(self):
with tf.name_scope("Train"):
self.trainer = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.loss_mse)
def setup_summary_for_tensorboard(self):
with tf.name_scope("Summary"):
# Clear logdir if it already exists
if tf.gfile.Exists(self.log_dir):
tf.gfile.DeleteRecursively(self.log_dir)
self.writer = tf.summary.FileWriter(self.log_dir, self.session.graph)
mse_training = tf.summary.scalar('MSE(Training)', self.loss_mse)
mae_training = tf.summary.scalar('MAE(Training)', self.loss_mae)
mape_training = tf.summary.scalar('MAPE(Training)', self.loss_mape)
self.merged_train = tf.summary.merge([mse_training, mae_training, mape_training])
mse_test = tf.summary.scalar('MSE(Test)', self.loss_mse)
mae_test = tf.summary.scalar('MAE(Test)', self.loss_mae)
mape_test = tf.summary.scalar('MAPE(Test)', self.loss_mape)
self.merged_test = tf.summary.merge([mse_test, mae_test, mape_test])
def prepare_supervised_data(self):
for i in range(self.N_samples):
self.supervised_data_input[i], self.supervised_data_output[i] = self.generate_data()
def train(self):
for epoch in range(self.total_epochs):
mae_tr = 0
mse_tr = 0
mape_tr = 0
mae_ts = 0
mse_ts = 0
mape_ts = 0
for step in range(self.N_training_data):
_, mae, mse, mape = self.session.run([self.trainer, self.loss_mae, self.loss_mse, self.loss_mape],
feed_dict={self.input_data: self.training_data_input[step],
self.output_data: self.training_data_output[step]})
mae_tr += mae
mse_tr += mse
mape_tr += mape
if self.merged_train is not None:
summary = self.session.run(self.merged_train,
feed_dict={self.input_data: self.training_data_input[step],
self.output_data: self.training_data_output[step]})
self.writer.add_summary(summary, epoch)
mae_tr = mae_tr / self.N_training_data
mse_tr = mse_tr / self.N_training_data
mape_tr = mape_tr / self.N_training_data
for step in range(self.N_test_data):
mae, mse, mape = self.session.run([self.loss_mae, self.loss_mse, self.loss_mape],
feed_dict={self.input_data: self.test_data_input[step],
self.output_data: self.test_data_output[step]})
mae_ts += mae
mse_ts += mse
mape_ts += mape
if self.merged_test is not None:
summary = self.session.run(self.merged_test,
feed_dict={self.input_data: self.test_data_input[step],
self.output_data: self.test_data_output[step]})
self.writer.add_summary(summary, epoch)
mae_ts = mae_ts / self.N_test_data
mse_ts = mse_ts / self.N_test_data
mape_ts = mape_ts / self.N_test_data
if epoch % 10 == 0:
print('%s %s %s %s %s %s %s' % (
'Epoch'.ljust(6), 'MSE(training)'.ljust(15), 'MAE(training)'.ljust(15), 'MAPE(training)'.ljust(16),
'MSE(test)'.ljust(15), 'MAE(test)'.ljust(15), 'MAPE(test)'.ljust(15)))
self.writer.flush()
print('%s %s %s %s %s %s %s' % (
('%d' % (epoch + 1)).ljust(6), ('%.6f' % mse_tr).ljust(15), ('%.6f' % mae_tr).ljust(15),
('%.6f' % mape_tr).ljust(16),
('%.6f' % mse_ts).ljust(15), ('%.6f' % mae_ts).ljust(15), ('%.6f' % mape_ts).ljust(16)))
def generate_data(self):
inputs = np.ndarray([self.N_input_parameters, 1])
for j in range(len(inputs)):
inputs[j] = (np.random.rand() - 0.5) * 2.0
outputs = self.target_func(*inputs)
return [inputs, outputs]
def get_output(self, input):
true_output = self.target_func(*sum(input, []))
computed_output = self.session.run(self.output, feed_dict={self.input_data: input})
return [input, true_output, computed_output]
def main():
mlp = MLP()
mlp.setup_parameters()
mlp.setup_variables()
mlp.setup_placeholders()
mlp.setup_layers()
mlp.setup_loss()
mlp.setup_trainer()
mlp.setup_summary_for_tensorboard()
mlp.prepare_supervised_data()
start_time = time.time()
mlp.train()
interval = int(time.time() - start_time)
print("Elapsed time: {} sec".format(interval))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment