Created
November 7, 2021 16:16
-
-
Save logankilpatrick/74b57e2b92e31159cd922869985cad38 to your computer and use it in GitHub Desktop.
Train an LSTM/GRU/Simple RNN Model with a TimeDistributed layer on a speech_recognition DataSet Tensorflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import numpy as np | |
import tensorflow as tf | |
from tensorflow.keras import models, layers, callbacks | |
from tensorflow.keras.utils import to_categorical | |
from tensorflow.keras.optimizers import Adam | |
from tensorflow.keras.datasets import mnist | |
# Change this to the location of the database directories | |
DB_DIR = os.path.dirname(os.path.realpath(__file__)) | |
# Import databases | |
sys.path.insert(1, DB_DIR) | |
from db_utils import get_imdb_dataset, get_speech_dataset, get_single_digit_dataset | |
class generic_vns_function(tf.keras.Model): | |
def __init__(self, num_cnn_layers, filter_size, kernel_size): | |
super().__init__() | |
# Convolutional layers and MaxPools | |
# adding arguments to function call for more flexibility | |
self.cnn_layers = [] | |
for i in range (num_cnn_layers): | |
self.cnn_layers.append(tf.keras.layers.Conv2D(filter_size[i], kernel_size[i], activation="relu")) | |
# Maxpool layer | |
self.cnn_layers.append(tf.keras.layers.MaxPooling2D((2,2))) | |
# Flatten | |
self.flatten = tf.keras.layers.Flatten() | |
# Dense layer | |
self.dense1 = tf.keras.layers.Dense(1024, activation="relu") | |
def call(self, x): | |
for layer in self.cnn_layers: | |
x = layer(x) | |
x = self.flatten(x) | |
x = self.dense1(x) | |
return x | |
class LSTMmodel(tf.keras.Model): | |
def __init__(self, cnn_model, num_class): | |
super().__init__() | |
self.cnn_model = cnn_model | |
self.gru = tf.keras.layers.GRU(units=64, return_state=True, dropout=0.3) #### LSTM/GRU/SimpleRNN | |
self.dense = tf.keras.layers.Dense(num_class, activation="softmax") | |
def call (self, input): | |
x = tf.keras.layers.TimeDistributed(self.cnn_model)(input) | |
x, _ = self.gru(x) | |
x = self.dense(x) | |
return x | |
def train_model(model, epochs, batch_size, X_train, y_train, X_test, y_test): | |
"""Generic Deep Learning Model training function.""" | |
cb = [callbacks.EarlyStopping(monitor='val_loss', patience=3)] | |
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=epochs, | |
batch_size=batch_size, verbose=1, callbacks=cb) | |
#model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'], run_eagerly=True) | |
scores = model.evaluate(X_test, y_test, verbose=2) | |
print("Baseline Error: %.2f%%" % (100-scores[1]*100)) | |
return model | |
def choose_dataset(dataset_type): | |
"""Select dataset based on string variable.""" | |
if dataset_type == "nlp": | |
return get_imdb_dataset(dir=DB_DIR) | |
elif dataset_type == "computer_vision": | |
(X_train, y_train), (X_test, y_test) = mnist.load_data() | |
elif dataset_type == "speech_recognition": | |
# (X_train, y_train), (X_test, y_test), (_, _) = get_single_digit_dataset(1) | |
(X_train, y_train), (X_test, y_test), (_, _) = get_speech_dataset() | |
else: | |
raise ValueError("Couldn't find dataset.") | |
(X_train, X_test) = normalize_dataset(dataset_type, X_train, X_test) | |
(X_train, y_train), (X_test, y_test) = reshape_dataset(X_train, y_train, X_test, y_test) | |
return (X_train, y_train), (X_test, y_test) | |
def normalize_dataset(string, X_train, X_test): | |
"""Normalize speech recognition and computer vision datasets.""" | |
if string == "computer_vision": | |
X_train = X_train / 255 | |
X_test = X_test / 255 | |
else: | |
mean = np.mean(X_train) | |
std = np.std(X_train) | |
X_train = (X_train-std)/mean | |
X_test = (X_test-std)/mean | |
return (X_train, X_test) | |
def reshape_dataset(X_train, y_train, X_test, y_test): | |
"""Reshape Computer Vision and Speech datasets.""" | |
y_train = to_categorical(y_train) | |
y_test = to_categorical(y_test) | |
return (X_train, y_train), (X_test, y_test) | |
def main(): | |
# Hyperparameters | |
windows = 3 # multiple of 99 | |
layers = 3 | |
layer_units = 200 | |
epochs = 10 | |
batch_size = 500 | |
lr = 0.0001 | |
# Dataset : "computer_vision" or "speech_recognition" | |
dataset = "speech_recognition" | |
# Import Datasets | |
(X_train, y_train), (X_test, y_test) = choose_dataset(dataset) | |
X_train = X_train.reshape(X_train.shape[0], windows, int(X_train.shape[1]/windows), X_train.shape[2], 1).astype('float32') | |
X_test = X_test.reshape(X_test.shape[0], windows, int(X_test.shape[1]/windows), X_test.shape[2], 1).astype('float32') | |
num_class = y_train.shape[1] | |
# Generate and train model | |
CNN_model = generic_vns_function(3, [128, 64, 32], [5, 5, 5]) | |
LSTM_model = LSTMmodel(CNN_model, num_class) | |
opt = Adam(lr=lr) | |
LSTM_model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'], run_eagerly=True) | |
trained_model = train_model(LSTM_model, epochs, batch_size, X_train, y_train, X_test, y_test) | |
#print(model.summary()) | |
# Save model to h5 file | |
#trained_model.save('models/model_%s_a3.h5' % dataset, save_format='tf') | |
return None | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment