Last active
November 30, 2021 01:58
-
-
Save mfurquimdev/e0ea54852b45abe261ae7bcd8b9a83ac to your computer and use it in GitHub Desktop.
Sentiment RNN for Udacity course
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import os | |
import torch | |
from collections import Counter | |
from os.path import exists as file_exists | |
from sklearn.model_selection import train_test_split | |
from string import punctuation | |
from torch import nn | |
from torch.utils.data import TensorDataset, DataLoader | |
def load_reviews_and_labels(): | |
print(f'\nload_reviews_and_labels()') | |
with open('data/reviews.txt', 'r') as f: | |
reviews = f.read() | |
with open('data/labels.txt', 'r') as f: | |
labels = f.read() | |
print(f'reviews loaded: {len(reviews):8d} Bytes') | |
print(f'labels loaded: {len(labels):8d} Bytes') | |
return reviews, labels | |
def data_preprocessing(reviews): | |
print(f'\ndata_preprocessing({reviews[:20]})') | |
reviews = reviews.lower() | |
all_text = ''.join([c for c in reviews if c not in punctuation]) | |
print(f'all_text: {all_text[:200]}') | |
reviews_split = all_text.split('\n') | |
all_text = ' '.join(reviews_split) | |
words = all_text.split() | |
print(f'words: {words[:20]}') | |
print(f'#words: {len(words):7d}') | |
print(f'#uniq: {len(set(words)):7d}') | |
return reviews_split, words | |
def enconding_words(reviews_split, words): | |
print(f'\nenconding_words({reviews_split[:1]}, {words[:20]}') | |
vocab_to_int = {y:x for x,y in enumerate(set(words), start=1)} | |
reviews_ints = [[vocab_to_int[word] | |
for word in review.split()] | |
for review in reviews_split] | |
print(f'Encoded dict size: {len(vocab_to_int)}') | |
print(f'Tokenized review: {reviews_ints[:1]}') | |
return reviews_ints, vocab_to_int | |
def enconding_labels(labels): | |
print(f'\nenconding_labels({labels[:20]})') | |
# 1=positive, 0=negative label conversion | |
labels = labels.split('\n') | |
encoded_labels = [1 | |
if label == 'positive' | |
else 0 | |
for label in labels] | |
print(f'labels[:10]: {labels[:10]}') | |
print(f'encoded_labels[:10]: {encoded_labels[:10]}') | |
return encoded_labels | |
def visualize_data(reviews_ints): | |
print(f'\nvisualize_data({reviews_ints[:1]})') | |
review_lens = Counter([len(x) for x in reviews_ints]) | |
print("Zero-length reviews: {}".format(review_lens[0])) | |
print("Maximum review length: {}".format(max(review_lens))) | |
fig, ax = plt.subplots() | |
ax.hist(review_lens, bins=1000, linewidth=0.5, edgecolor="white") | |
plt.show() | |
def remove_outliers(reviews_ints, encoded_labels): | |
print(f'\nremove_outliers({reviews_ints[:1]}, {encoded_labels[:10]})') | |
print('Number of reviews before removing outliers: ', len(reviews_ints)) | |
for idx, review in reversed([(idx,review) for idx,review in enumerate(reviews_ints)]): | |
if len(review) == 0: | |
reviews_ints.pop(idx) | |
encoded_labels.pop(idx) | |
print('Number of reviews after removing outliers: ', len(reviews_ints)) | |
return reviews_ints, encoded_labels | |
def pad_features(reviews_ints, seq_length): | |
''' Return features of review_ints, where each review is padded with 0's | |
or truncated to the input seq_length. | |
''' | |
print(f'\npad_features({reviews_ints[:1]}, {seq_length})') | |
features=np.empty((0,seq_length), dtype=np.int8) | |
for review_int in reviews_ints: | |
if len(review_int) > seq_length: | |
features = np.append(features, [review_int[:seq_length]], axis=0) | |
else: | |
features = np.append(features, [np.pad(review_int, (seq_length-len(review_int),0), 'constant', constant_values=(0,0))], axis=0) | |
print(f'first 10 values of firest 5 batches\n' | |
f'{features[:5,:10]}') | |
return features | |
def train_valid_test_dataset(features, encoded_labels, train_data_frac=0.8): | |
print(f'\nsplit_train_test_validation: {features[:5, :10]}, {encoded_labels[:5]}, {train_data_frac})') | |
X = features | |
y = np.array(encoded_labels) | |
## split data into training, validation, and test data (features and labels, x and y) | |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=1-train_data_frac) | |
X_test, X_valid, y_test, y_valid = train_test_split(X_test, y_test, test_size=0.5) | |
## print out the shapes of your resultant feature data | |
print(f' Feature Shapes:') | |
print(f'Train set: {X_train.shape} {len(y_train)}') | |
print(f'Validation set: {X_valid.shape} {len(y_valid)}') | |
print(f'Test set: {X_test.shape} {len(y_test)}') | |
# create Tensor datasets | |
train_data = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train)) | |
valid_data = TensorDataset(torch.from_numpy(X_valid), torch.from_numpy(y_valid)) | |
test_data = TensorDataset(torch.from_numpy(X_test), torch.from_numpy(y_test)) | |
return train_data, valid_data, test_data | |
def load_preprocess_data(data_directory, dataset_filename, visualize_data): | |
print(f'\nload_preprocess_data({data_directory}, {dataset_filename}, {visualize_data})') | |
if file_exists(f'{data_directory}/{dataset_filename}'): | |
reviews_data = torch.load(f'{data_directory}/{dataset_filename}') | |
features, encoded_labels = reviews_data.tensors | |
features = np.array(features).reshape(features.shape) | |
encoded_labels = np.array(encoded_labels).reshape(encoded_labels.shape) | |
else: | |
reviews, labels = load_reviews_and_labels() | |
reviews_split, words = data_preprocessing(reviews) | |
reviews_ints, vocab_to_int = enconding_words(reviews_split, words) | |
encoded_labels = enconding_labels(labels) | |
if visualize_data: | |
visualize_data(reviews_ints) | |
reviews_ints, encoded_labels = remove_outliers(reviews_ints, encoded_labels) | |
seq_length = 200 | |
features = pad_features(reviews_ints, seq_length=seq_length) | |
assert len(features)==len(reviews_ints), "Your features should have as many rows as reviews." | |
assert len(features[0])==seq_length, "Each feature row should contain seq_length values." | |
encoded_labels = np.array(encoded_labels) | |
reviews_data = TensorDataset(torch.from_numpy(features), torch.from_numpy(encoded_labels)) | |
torch.save(reviews_data, f'{data_directory}/{dataset_filename}') | |
return features, encoded_labels | |
def get_data_loaders(features, encoded_labels, train_data_frac, batch_size): | |
print(f'\nget_data_loaders({features}, {encoded_labels}, {train_data_frac})') | |
train_data, valid_data, test_data = \ | |
train_valid_test_dataset(features, encoded_labels, train_data_frac=train_data_frac) | |
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size) | |
valid_loader = DataLoader(valid_data, shuffle=True, batch_size=batch_size) | |
test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size) | |
# obtain one batch of training data | |
dataiter = iter(train_loader) | |
sample_x, sample_y = dataiter.next() | |
print('\nSample input size: ', sample_x.size()) # batch_size, seq_length | |
print('Sample input: \n', sample_x) | |
print() | |
print('Sample label size: ', sample_y.size()) # batch_size | |
print('Sample label: \n', sample_y) | |
return train_loader, valid_loader, test_loader | |
class SentimentRNN(nn.Module): | |
""" | |
The RNN model that will be used to perform Sentiment analysis. | |
""" | |
def __init__(self, vocab_size, output_size, embedding_dim, hidden_dim, n_layers, drop_prob=0.5): | |
""" | |
Initialize the model by setting up the layers. | |
""" | |
print(f'\nSentimentRNN.__init__(' | |
f'{vocab_size}, {output_size}, {embedding_dim}, {hidden_dim}, {n_layers}, {drop_prob})') | |
super(SentimentRNN, self).__init__() | |
self.output_size = output_size | |
self.n_layers = n_layers | |
self.hidden_dim = hidden_dim | |
# define all layers | |
self.embedding = nn.Embedding(vocab_size, embedding_dim) | |
## TODO: define the LSTM | |
self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, | |
dropout=drop_prob, batch_first=True) | |
## TODO: define a dropout layer | |
self.dropout = nn.Dropout(drop_prob) | |
## TODO: define the final, fully-connected output layer | |
self.fc = nn.Linear(hidden_dim, output_size) | |
self.sig = nn.Sigmoid() | |
def forward(self, x, hidden): | |
""" | |
Perform a forward pass of our model on some input and hidden state. | |
""" | |
print(f'\nforward({x}, {hidden})') | |
batch_size = x.size(0) | |
print(f'batch_size = {batch_size}') | |
emb_x = self.embedding(x) | |
print(f'emb_x = {emb_x}') | |
lstm_out, hidden = self.lstm(emb_x, hidden) | |
print(f'lstm_out = {lstm_out}') | |
print(f'hidden = {hidden}') | |
lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim) | |
print(f'contiguous.out = {lstm_out}') | |
out = self.dropout(lstm_out) | |
print(f'dropout.out = {out}') | |
out = self.fc(out) | |
print(f'fc.out = {out}') | |
sig_out = self.sig(out) | |
print(f'fc.sig_out = {sig_out}') | |
sig_out = sig_out.view(batch_size, -1) | |
print(f'fc.sig_out = {sig_out}') | |
sig_out = sig_out[:, -1] | |
print(f'fc.sig_out = {sig_out}') | |
return sig_out, hidden | |
def init_hidden(self, batch_size): | |
''' Initializes hidden state ''' | |
# Create two new tensors with sizes n_layers x batch_size x hidden_dim, | |
# initialized to zero, for hidden state and cell state of LSTM | |
weight = next(self.parameters()).data | |
if (train_on_gpu): | |
hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().cuda(), | |
weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().cuda()) | |
else: | |
hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_(), | |
weight.new(self.n_layers, batch_size, self.hidden_dim).zero_()) | |
return hidden | |
def main(): | |
data_directory = os.getenv('DATA_DIRECTORY', 'data') | |
dataset_filename = os.getenv('DATASET_FILENAME', 'reviews.pt') | |
visualize_data = os.getenv('VISUALIZE_DATA', 'FALSE').upper() == 'TRUE' | |
features, encoded_labels = load_preprocess_data(data_directory, dataset_filename, visualize_data) | |
train_data_frac = 0.8 | |
batch_size = 50 | |
train_loader, valid_loader, test_loader = get_data_loaders(features, encoded_labels, train_data_frac, batch_size) | |
# First checking if GPU is available | |
train_on_gpu=torch.cuda.is_available() | |
print() | |
if(train_on_gpu): | |
print('Training on GPU.') | |
else: | |
print('No GPU available, training on CPU.') | |
reviews, _ = load_reviews_and_labels() | |
reviews_split, words = data_preprocessing(reviews) | |
_, vocab_to_int = enconding_words(reviews_split, words) | |
vocab_size = len(vocab_to_int)+1 | |
output_size = 1 | |
embedding_dim = 400 | |
hidden_dim = 256 | |
n_layers = 2 | |
srnn = SentimentRNN(vocab_size, output_size, embedding_dim, hidden_dim, n_layers, drop_prob=0.5) | |
print(srnn) | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment