Created
February 6, 2017 19:49
-
-
Save igormq/5e1f28ada658963e1c889a78073677be to your computer and use it in GitHub Desktop.
An example of classification using RNN
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Compatibility imports | |
from __future__ import absolute_import | |
from __future__ import division | |
from __future__ import print_function | |
import time | |
import tensorflow as tf | |
import scipy.io.wavfile as wav | |
import numpy as np | |
from six.moves import xrange as range | |
try: | |
from tensorflow.python.ops import ctc_ops | |
except ImportError: | |
from tensorflow.contrib.ctc import ctc_ops | |
try: | |
from python_speech_features import mfcc | |
except ImportError: | |
print("Failed to import python_speech_features.\n Try pip install python_speech_features.") | |
raise ImportError | |
from utils import maybe_download as maybe_download | |
from utils import sparse_tuple_from as sparse_tuple_from | |
# Some configs | |
num_features = 13 | |
num_classes = 1 | |
# Hyper-parameters | |
num_epochs = 200 | |
num_hidden = 50 | |
num_layers = 1 | |
batch_size = 1 | |
initial_learning_rate = 1e-2 | |
momentum = 0.9 | |
num_examples = 1 | |
num_batches_per_epoch = int(num_examples/batch_size) | |
# Loading the data | |
audio_filename = maybe_download('LDC93S1.wav', 93638) | |
fs, audio = wav.read(audio_filename) | |
inputs = mfcc(audio, samplerate=fs) | |
# Tranform in 3D array | |
train_inputs = np.asarray(inputs[np.newaxis, :]) | |
train_inputs = (train_inputs - np.mean(train_inputs))/np.std(train_inputs) | |
train_seq_len = [train_inputs.shape[1]] | |
# Readings targets | |
with open(target_filename, 'r') as f: | |
#Only the last line is necessary | |
line = f.readlines()[-1] | |
# Get only the words between [a-z] and replace period for none | |
original = ' '.join(line.strip().lower().split(' ')[2:]).replace('.', '') | |
targets = #change here for one hot encoding | |
# We don't have a validation dataset :( | |
val_inputs, val_targets, val_seq_len = train_inputs, train_targets, \ | |
train_seq_len | |
# THE MAIN CODE! | |
graph = tf.Graph() | |
with graph.as_default(): | |
# e.g: log filter bank or MFCC features | |
# Has size [batch_size, max_stepsize, num_features], but the | |
# batch_size and max_stepsize can vary along each step | |
inputs = tf.placeholder(tf.float32, [None, None, num_features]) | |
# One hot enconding | |
# Has sie [batch_size, num_classes] | |
targets = tf.placeholder(tf.float32, [None, num_classes]) | |
# 1d array of size [batch_size] | |
seq_len = tf.placeholder(tf.int32, [None]) | |
# Defining the cell | |
# Can be: | |
# tf.nn.rnn_cell.RNNCell | |
# tf.nn.rnn_cell.GRUCell | |
cell = tf.nn.rnn_cell.LSTMCell(num_hidden, state_is_tuple=True) | |
# Stacking rnn cells | |
stack = tf.nn.rnn_cell.MultiRNNCell([cell] * num_layers, | |
state_is_tuple=True) | |
# The second output is the last state and we will no use that | |
outputs, _ = tf.nn.dynamic_rnn(stack, inputs, seq_len, dtype=tf.float32) | |
shape = tf.shape(inputs) | |
batch_s, max_timesteps = shape[0], shape[1] | |
outputs = tf.transpose(outputs, [1, 0, 2]) | |
last = tf.gather(outputs, int(outputs.get_shape()[0]) - 1) | |
# Truncated normal with mean 0 and stdev=0.1 | |
# Tip: Try another initialization | |
# see https://www.tensorflow.org/versions/r0.9/api_docs/python/contrib.layers.html#initializers | |
W = tf.Variable(tf.truncated_normal([num_hidden, | |
num_classes], | |
stddev=0.1)) | |
# Zero initialization | |
# Tip: Is tf.zeros_initializer the same? | |
b = tf.Variable(tf.constant(0., shape=[num_classes])) | |
# Doing the affine projection | |
logits = tf.matmul(last, W) + b | |
loss = tf.nn.softmax_cross_entropy_with_logits(logits, targets, dim=-1, name=None) | |
cost = tf.reduce_mean(loss) | |
optimizer = tf.train.MomentumOptimizer(initial_learning_rate, | |
0.9).minimize(cost) | |
with tf.Session(graph=graph) as session: | |
# Initializate the weights and biases | |
tf.initialize_all_variables().run() | |
for curr_epoch in range(num_epochs): | |
train_cost = train_ler = 0 | |
start = time.time() | |
for batch in range(num_batches_per_epoch): | |
feed = {inputs: train_inputs, | |
targets: train_targets, | |
seq_len: train_seq_len} | |
batch_cost, _ = session.run([cost, optimizer], feed) | |
train_cost += batch_cost*batch_size | |
train_cost /= num_examples | |
val_feed = {inputs: val_inputs, | |
targets: val_targets, | |
seq_len: val_seq_len} | |
val_cost = session.run([cost], feed_dict=val_feed) | |
log = "Epoch {}/{}, train_cost = {:.3f}, val_cost = {:.3f}, time = {:.3f}" | |
print(log.format(curr_epoch+1, num_epochs, train_cost, | |
val_cost, time.time() - start)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment