Skip to content

Instantly share code, notes, and snippets.

@sunil-at-gh
Last active November 6, 2016 13:35
Show Gist options
  • Save sunil-at-gh/0513c1d30cb378c66b0d to your computer and use it in GitHub Desktop.
Save sunil-at-gh/0513c1d30cb378c66b0d to your computer and use it in GitHub Desktop.
Faster RNN in Keras
"""
Run-time Performance test of RNN and Streamlined RNN.
"""
import numpy as np
import time
import sys
from keras import backend as K
from keras.models import Sequential
from keras.layers.recurrent_0 import SimpleRNN, GRU, LSTM
from keras.layers import Dense, TimeDistributedDense
from keras.optimizers import SGD
from keras.layers.recurrent import SimpleRNN as StreamlinedRNN, GRU as StreamlinedGRU, LSTM as StreamlinedLSTM
def build_model(n_seqs, x_width, nhidden, batch_size,
use_keras_rnn=True, rnn_type='rnn', p_dropout=0,
lr=0.01, is_stateful=False):
assert rnn_type in ('rnn', 'gru', 'lstm')
np.random.seed(123)
model = Sequential()
if is_stateful:
kwargs = {'batch_input_shape': (batch_size, n_seqs, x_width)}
else:
kwargs = {'input_shape': (n_seqs, x_width)}
if 1.0 > p_dropout > 0:
kwargs['dropout_W'] = p_dropout
kwargs['dropout_U'] = p_dropout
if use_keras_rnn:
if rnn_type == 'rnn':
model.add(SimpleRNN(nhidden, return_sequences=True, stateful=is_stateful, **kwargs))
elif rnn_type == 'lstm':
model.add(LSTM(nhidden, return_sequences=True, stateful=is_stateful, **kwargs))
else:
model.add(GRU(nhidden, return_sequences=True, stateful=is_stateful, **kwargs))
else:
if rnn_type == 'rnn':
model.add(StreamlinedRNN(nhidden, return_sequences=True, stateful=is_stateful, **kwargs))
elif rnn_type == 'lstm':
model.add(StreamlinedLSTM(nhidden, return_sequences=True, stateful=is_stateful, **kwargs))
else:
model.add(StreamlinedGRU(nhidden, return_sequences=True, stateful=is_stateful, **kwargs))
model.add(TimeDistributedDense(output_dim=1, activation='sigmoid'))
opt = SGD(lr=lr, clipvalue=1.0)
model.compile(loss='mse', optimizer=opt)
return model
def get_trng_data(n_samples, n_seqs, x_width):
x = np.random.rand(n_samples, n_seqs, x_width) * 2.0
y = np.zeros((n_samples, n_seqs, 1))
return x, y
def pp_history(mdlnm, history, verbose=False):
if verbose:
print("History from:", mdlnm)
print(" epoch =", history.epoch)
print(" history =", history.history)
print()
else:
# Loss after last epoch
print("Loss for", mdlnm, "=", history.history["loss"][-1])
return
def build_and_run(n_epochs, n_seqs, x_width, rnn_type='rnn', p_dropout=0):
nhidden = 50
verbosity = 0
n_trng = 10000
batch_size = 100
is_stateful = False
print("Building models ...")
model_k = build_model(n_seqs, x_width, nhidden, batch_size, lr=0.01, is_stateful=is_stateful,
use_keras_rnn=True, rnn_type=rnn_type, p_dropout=p_dropout)
model_s = build_model(n_seqs, x_width, nhidden, batch_size, lr=0.01, is_stateful=is_stateful,
use_keras_rnn=False, rnn_type=rnn_type, p_dropout=p_dropout)
x, y = get_trng_data(n_trng, n_seqs, x_width)
print("Training model_k for {} epochs ...".format(n_epochs))
time_start = time.time()
history_k = model_k.fit(x, y, batch_size=batch_size, nb_epoch=n_epochs, shuffle=False,
show_accuracy=True, verbose=verbosity)
time_k = time.time() - time_start
print("Training model_s for {} epochs ...".format(n_epochs))
time_start = time.time()
history_s = model_s.fit(x, y, batch_size=batch_size, nb_epoch=n_epochs, shuffle=False,
show_accuracy=True, verbose=verbosity)
time_s = time.time() - time_start
pp_history("model_k", history_k)
pp_history("model_s", history_s)
return time_k, time_s
def run_experiment(rnn_type, p_dropout):
print("Running Training-time test for {} with dropout = {} ...".format(rnn_type.upper(), p_dropout))
nepochs = 10
n_seqs = 20
x_width = 1000
print("nbr time steps = ", n_seqs, ", width of input to RNN at each step = ", x_width, sep="")
time_k, time_s = build_and_run(nepochs, n_seqs, x_width, rnn_type, p_dropout)
print("Run times for {} epochs, dropout = {}:".format(nepochs, p_dropout))
print(" model with original {}: {:.1f} sec".format(rnn_type.upper(), time_k))
print(" model with Streamlined {}: {:.1f} sec".format(rnn_type.upper(), time_s))
print(" run-time improvement = {:.1f}%".format( (time_k - time_s)/time_k * 100.0))
print()
return
if __name__ == '__main__':
if len(sys.argv) > 1:
rnn_type = sys.argv[1]
p_dropout = float(sys.argv[2]) if len(sys.argv) > 2 else 0
run_experiment(rnn_type, p_dropout)
else:
print("Running all configurations ...\n")
run_experiment('rnn', 0)
run_experiment('rnn', 0.7)
run_experiment('gru', 0)
run_experiment('gru', 0.7)
run_experiment('lstm', 0)
run_experiment('lstm', 0.7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment