Skip to content

Instantly share code, notes, and snippets.

@wf34
Last active October 25, 2017 16:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wf34/fb60fc889f3eae9a674fb4e6d68c7389 to your computer and use it in GitHub Desktop.
Save wf34/fb60fc889f3eae9a674fb4e6d68c7389 to your computer and use it in GitHub Desktop.
Delayed sin echo prediction with Tensorflow and Keras (Detailed discussion at: https://stackoverflow.com/questions/46937898/delayed-echo-of-sin-cannot-reproduce-tensorflow-result-in-keras)
import argparse
import sys
import gc
from typing import Optional, Tuple
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, TimeDistributed
from keras.layers import LSTM
import keras.optimizers
from keras.backend.tensorflow_backend import set_session
import keras.backend as K
import tensorflow as tf
from tensorflow.contrib import rnn
import matplotlib.pyplot as plt
def generate_sample(f: Optional[float] = 1.0,
t0: Optional[float] = None, batch_size: int = 1,
samples: int = 200,
delay : int = 20) \
-> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Generates data samples.
:param f: The frequency to use for all time series or None to randomize.
:param t0: The time offset to use for all time series or None to randomize.
:param batch_size: The number of time series to generate.
:param predict: The number of future samples to generate.
:param samples: The number of past (and current) samples to generate.
:return: Tuple that contains the past times and values as well as the future times and values. In all outputs,
each row represents one time series of the batch.
"""
Fs = 100
T = np.empty((batch_size, samples))
Y = np.empty((batch_size, samples))
FT = np.empty((batch_size, samples))
FY = np.empty((batch_size, samples))
_t0 = t0
for i in range(batch_size):
t = np.arange(0, samples + delay) / Fs
if _t0 is None:
t0 = np.random.rand() * 2 * np.pi
else:
t0 = _t0 + i/float(batch_size)
freq = f
if freq is None:
freq = np.random.rand() * 3.5 + 0.5
y = np.sin(2 * np.pi * freq * (t + t0))
T[i, :] = t[delay:]
Y[i, :] = y[delay:]
FT[i, :] = t[:samples]
FY[i, :] = y[:samples]
#print(Y, FY)
return T, Y, FT, FY
learning_rate = 0.001
training_iters = 650000
batch_size = 50
display_step = 100
log_step = 10
# Network Parameters
n_input = 1 # input is sin(x), a scalar
n_steps = 100 # timesteps
n_hidden = 32 # hidden layer num of features
SEED = 34
model_choices_ = ['keras', 'tensorflow']
def custom_loss(y_true, y_pred):
y_true = K.squeeze(y_true, axis = 2)
y_pred = K.squeeze(y_pred, axis = 2)
sq_diff = K.square(y_true - y_pred)
return K.mean(K.sum(sq_diff, axis = 1), axis = 0)
def do_model(lib):
assert lib in model_choices_
if lib == 'keras':
model = Sequential()
model.add(LSTM(n_hidden,
input_shape=(n_steps, n_input),
return_sequences=True))
model.add(TimeDistributed(Dense(n_input, activation='linear')))
model.compile(loss=custom_loss,
optimizer=keras.optimizers.Adam(lr=learning_rate),
metrics=[])
return model
elif lib == 'tensorflow':
x = tf.placeholder(tf.float32, [None, n_steps, n_input])
y = tf.placeholder(tf.float32, [None, n_steps])
weights = {
'out': tf.Variable(tf.random_normal([n_hidden, n_steps], seed = SEED))
}
biases = {
'out': tf.Variable(tf.random_normal([n_steps], seed = SEED))
}
lstm = rnn.LSTMCell(n_hidden, forget_bias=1.0)
outputs, states = tf.nn.dynamic_rnn(lstm, inputs=x,
dtype=tf.float32,
time_major=False)
h = tf.transpose(outputs, [1, 0, 2])
pred = tf.nn.bias_add(tf.matmul(h[-1], weights['out']), biases['out'])
individual_losses = tf.reduce_sum(tf.squared_difference(pred, y),
reduction_indices=1)
loss = tf.reduce_mean(individual_losses)
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate) \
.minimize(loss)
return {'optimizer' : optimizer,
'pred' : pred,
'loss' : loss,
'x' : x,
'y' : y}
def do_one_iter(session, model, lib, batch_x, batch_y):
assert lib in model_choices_
if lib == 'keras':
batch_x = batch_x.reshape((batch_size, n_steps, n_input))
batch_y = batch_y.reshape((batch_size, n_steps, n_input))
return model.train_on_batch(batch_x, batch_y)
elif lib == 'tensorflow':
batch_x = batch_x.reshape((batch_size, n_steps, n_input))
batch_y = batch_y.reshape((batch_size, n_steps))
x = model['x']
y = model['y']
optimizer = model['optimizer']
loss = model['loss']
session.run(optimizer, feed_dict={x: batch_x, y: batch_y})
return session.run(loss, feed_dict={x: batch_x, y: batch_y})
def do_prediction(session, model, lib, test_input):
assert lib in model_choices_
if lib == 'keras':
return model.predict_on_batch(test_input)
elif lib == 'tensorflow':
x = model['x']
pred = model['pred']
return session.run(pred, feed_dict={x: test_input})
def main(lib):
np.random.seed(SEED)
tf.reset_default_graph()
graph_level_seed = SEED
tf.set_random_seed(graph_level_seed)
model = do_model(lib)
config = tf.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.1
init = tf.global_variables_initializer()
with tf.Session(config=config) as sess:
sess.run(init)
step = 1
loss_values = [np.inf]
loss_steps = [0.]
target_loss = 0.15
# Keep training until we reach max iterations
while step * batch_size < training_iters and loss_values[-1] > target_loss:
_, batch_x, __, batch_y = generate_sample(f=None, t0=None,
batch_size=batch_size,
samples=n_steps)
loss_value = do_one_iter(sess, model, lib, batch_x, batch_y)
if step % log_step == 0:
loss_values.append(loss_value)
loss_steps.append(step)
if step % display_step == 0:
print("Iter " + str(step) + \
", Minibatch Loss= {:.6f}".format(loss_value))
step += 1
print("Optimization Finished!")
plt.plot(loss_steps[1:], loss_values[1:])
plt.yscale('log')
plt.show()
# Test the prediction
n_tests = 3
for i in range(1, n_tests+1):
plt.subplot(n_tests, 1, i)
t, y, delayed_t, delayed_y = \
generate_sample(f=i, t0=None, samples=n_steps)
test_input = y.reshape((1, n_steps, n_input))
prediction = do_prediction(sess, model, lib, test_input)
t = t.ravel()
y = y.ravel()
delayed_t = delayed_t.ravel()
delayed_y = delayed_y.ravel()
prediction = prediction.ravel()
plt.plot(t, y, color='black')
plt.plot(delayed_t, delayed_y, color='green', linestyle=':', linewidth = 3)
plt.plot(delayed_t, prediction, color='red')
plt.ylim([-1., 1.])
plt.xlabel('time [t]')
plt.ylabel('signal')
plt.show()
gc.collect()
if __name__ == '__main__':
if sys.version_info[0] < 3:
raise ValueError('tested on python3 only')
else:
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--lib',
choices = model_choices_,
required = True)
main(**vars(parser.parse_args()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment