Skip to content

Instantly share code, notes, and snippets.

@mfurquimdev
Last active November 30, 2021 01:58
Show Gist options
  • Save mfurquimdev/e0ea54852b45abe261ae7bcd8b9a83ac to your computer and use it in GitHub Desktop.
Save mfurquimdev/e0ea54852b45abe261ae7bcd8b9a83ac to your computer and use it in GitHub Desktop.
Sentiment RNN for Udacity course
#!/bin/env python
import matplotlib.pyplot as plt
import numpy as np
import os
import torch
from collections import Counter
from os.path import exists as file_exists
from sklearn.model_selection import train_test_split
from string import punctuation
from torch import nn
from torch.utils.data import TensorDataset, DataLoader
def load_reviews_and_labels():
print(f'\nload_reviews_and_labels()')
with open('data/reviews.txt', 'r') as f:
reviews = f.read()
with open('data/labels.txt', 'r') as f:
labels = f.read()
print(f'reviews loaded: {len(reviews):8d} Bytes')
print(f'labels loaded: {len(labels):8d} Bytes')
return reviews, labels
def data_preprocessing(reviews):
print(f'\ndata_preprocessing({reviews[:20]})')
reviews = reviews.lower()
all_text = ''.join([c for c in reviews if c not in punctuation])
print(f'all_text: {all_text[:200]}')
reviews_split = all_text.split('\n')
all_text = ' '.join(reviews_split)
words = all_text.split()
print(f'words: {words[:20]}')
print(f'#words: {len(words):7d}')
print(f'#uniq: {len(set(words)):7d}')
return reviews_split, words
def enconding_words(reviews_split, words):
print(f'\nenconding_words({reviews_split[:1]}, {words[:20]}')
vocab_to_int = {y:x for x,y in enumerate(set(words), start=1)}
reviews_ints = [[vocab_to_int[word]
for word in review.split()]
for review in reviews_split]
print(f'Encoded dict size: {len(vocab_to_int)}')
print(f'Tokenized review: {reviews_ints[:1]}')
return reviews_ints, vocab_to_int
def enconding_labels(labels):
print(f'\nenconding_labels({labels[:20]})')
# 1=positive, 0=negative label conversion
labels = labels.split('\n')
encoded_labels = [1
if label == 'positive'
else 0
for label in labels]
print(f'labels[:10]: {labels[:10]}')
print(f'encoded_labels[:10]: {encoded_labels[:10]}')
return encoded_labels
def visualize_data(reviews_ints):
print(f'\nvisualize_data({reviews_ints[:1]})')
review_lens = Counter([len(x) for x in reviews_ints])
print("Zero-length reviews: {}".format(review_lens[0]))
print("Maximum review length: {}".format(max(review_lens)))
fig, ax = plt.subplots()
ax.hist(review_lens, bins=1000, linewidth=0.5, edgecolor="white")
plt.show()
def remove_outliers(reviews_ints, encoded_labels):
print(f'\nremove_outliers({reviews_ints[:1]}, {encoded_labels[:10]})')
print('Number of reviews before removing outliers: ', len(reviews_ints))
for idx, review in reversed([(idx,review) for idx,review in enumerate(reviews_ints)]):
if len(review) == 0:
reviews_ints.pop(idx)
encoded_labels.pop(idx)
print('Number of reviews after removing outliers: ', len(reviews_ints))
return reviews_ints, encoded_labels
def pad_features(reviews_ints, seq_length):
''' Return features of review_ints, where each review is padded with 0's
or truncated to the input seq_length.
'''
print(f'\npad_features({reviews_ints[:1]}, {seq_length})')
features=np.empty((0,seq_length), dtype=np.int8)
for review_int in reviews_ints:
if len(review_int) > seq_length:
features = np.append(features, [review_int[:seq_length]], axis=0)
else:
features = np.append(features, [np.pad(review_int, (seq_length-len(review_int),0), 'constant', constant_values=(0,0))], axis=0)
print(f'first 10 values of firest 5 batches\n'
f'{features[:5,:10]}')
return features
def train_valid_test_dataset(features, encoded_labels, train_data_frac=0.8):
print(f'\nsplit_train_test_validation: {features[:5, :10]}, {encoded_labels[:5]}, {train_data_frac})')
X = features
y = np.array(encoded_labels)
## split data into training, validation, and test data (features and labels, x and y)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=1-train_data_frac)
X_test, X_valid, y_test, y_valid = train_test_split(X_test, y_test, test_size=0.5)
## print out the shapes of your resultant feature data
print(f' Feature Shapes:')
print(f'Train set: {X_train.shape} {len(y_train)}')
print(f'Validation set: {X_valid.shape} {len(y_valid)}')
print(f'Test set: {X_test.shape} {len(y_test)}')
# create Tensor datasets
train_data = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train))
valid_data = TensorDataset(torch.from_numpy(X_valid), torch.from_numpy(y_valid))
test_data = TensorDataset(torch.from_numpy(X_test), torch.from_numpy(y_test))
return train_data, valid_data, test_data
def load_preprocess_data(data_directory, dataset_filename, visualize_data):
print(f'\nload_preprocess_data({data_directory}, {dataset_filename}, {visualize_data})')
if file_exists(f'{data_directory}/{dataset_filename}'):
reviews_data = torch.load(f'{data_directory}/{dataset_filename}')
features, encoded_labels = reviews_data.tensors
features = np.array(features).reshape(features.shape)
encoded_labels = np.array(encoded_labels).reshape(encoded_labels.shape)
else:
reviews, labels = load_reviews_and_labels()
reviews_split, words = data_preprocessing(reviews)
reviews_ints, vocab_to_int = enconding_words(reviews_split, words)
encoded_labels = enconding_labels(labels)
if visualize_data:
visualize_data(reviews_ints)
reviews_ints, encoded_labels = remove_outliers(reviews_ints, encoded_labels)
seq_length = 200
features = pad_features(reviews_ints, seq_length=seq_length)
assert len(features)==len(reviews_ints), "Your features should have as many rows as reviews."
assert len(features[0])==seq_length, "Each feature row should contain seq_length values."
encoded_labels = np.array(encoded_labels)
reviews_data = TensorDataset(torch.from_numpy(features), torch.from_numpy(encoded_labels))
torch.save(reviews_data, f'{data_directory}/{dataset_filename}')
return features, encoded_labels
def get_data_loaders(features, encoded_labels, train_data_frac, batch_size):
print(f'\nget_data_loaders({features}, {encoded_labels}, {train_data_frac})')
train_data, valid_data, test_data = \
train_valid_test_dataset(features, encoded_labels, train_data_frac=train_data_frac)
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
valid_loader = DataLoader(valid_data, shuffle=True, batch_size=batch_size)
test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size)
# obtain one batch of training data
dataiter = iter(train_loader)
sample_x, sample_y = dataiter.next()
print('\nSample input size: ', sample_x.size()) # batch_size, seq_length
print('Sample input: \n', sample_x)
print()
print('Sample label size: ', sample_y.size()) # batch_size
print('Sample label: \n', sample_y)
return train_loader, valid_loader, test_loader
class SentimentRNN(nn.Module):
"""
The RNN model that will be used to perform Sentiment analysis.
"""
def __init__(self, vocab_size, output_size, embedding_dim, hidden_dim, n_layers, drop_prob=0.5):
"""
Initialize the model by setting up the layers.
"""
print(f'\nSentimentRNN.__init__('
f'{vocab_size}, {output_size}, {embedding_dim}, {hidden_dim}, {n_layers}, {drop_prob})')
super(SentimentRNN, self).__init__()
self.output_size = output_size
self.n_layers = n_layers
self.hidden_dim = hidden_dim
# define all layers
self.embedding = nn.Embedding(vocab_size, embedding_dim)
## TODO: define the LSTM
self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers,
dropout=drop_prob, batch_first=True)
## TODO: define a dropout layer
self.dropout = nn.Dropout(drop_prob)
## TODO: define the final, fully-connected output layer
self.fc = nn.Linear(hidden_dim, output_size)
self.sig = nn.Sigmoid()
def forward(self, x, hidden):
"""
Perform a forward pass of our model on some input and hidden state.
"""
print(f'\nforward({x}, {hidden})')
batch_size = x.size(0)
print(f'batch_size = {batch_size}')
emb_x = self.embedding(x)
print(f'emb_x = {emb_x}')
lstm_out, hidden = self.lstm(emb_x, hidden)
print(f'lstm_out = {lstm_out}')
print(f'hidden = {hidden}')
lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim)
print(f'contiguous.out = {lstm_out}')
out = self.dropout(lstm_out)
print(f'dropout.out = {out}')
out = self.fc(out)
print(f'fc.out = {out}')
sig_out = self.sig(out)
print(f'fc.sig_out = {sig_out}')
sig_out = sig_out.view(batch_size, -1)
print(f'fc.sig_out = {sig_out}')
sig_out = sig_out[:, -1]
print(f'fc.sig_out = {sig_out}')
return sig_out, hidden
def init_hidden(self, batch_size):
''' Initializes hidden state '''
# Create two new tensors with sizes n_layers x batch_size x hidden_dim,
# initialized to zero, for hidden state and cell state of LSTM
weight = next(self.parameters()).data
if (train_on_gpu):
hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().cuda(),
weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().cuda())
else:
hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_(),
weight.new(self.n_layers, batch_size, self.hidden_dim).zero_())
return hidden
def main():
data_directory = os.getenv('DATA_DIRECTORY', 'data')
dataset_filename = os.getenv('DATASET_FILENAME', 'reviews.pt')
visualize_data = os.getenv('VISUALIZE_DATA', 'FALSE').upper() == 'TRUE'
features, encoded_labels = load_preprocess_data(data_directory, dataset_filename, visualize_data)
train_data_frac = 0.8
batch_size = 50
train_loader, valid_loader, test_loader = get_data_loaders(features, encoded_labels, train_data_frac, batch_size)
# First checking if GPU is available
train_on_gpu=torch.cuda.is_available()
print()
if(train_on_gpu):
print('Training on GPU.')
else:
print('No GPU available, training on CPU.')
reviews, _ = load_reviews_and_labels()
reviews_split, words = data_preprocessing(reviews)
_, vocab_to_int = enconding_words(reviews_split, words)
vocab_size = len(vocab_to_int)+1
output_size = 1
embedding_dim = 400
hidden_dim = 256
n_layers = 2
srnn = SentimentRNN(vocab_size, output_size, embedding_dim, hidden_dim, n_layers, drop_prob=0.5)
print(srnn)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment