Last active
September 4, 2022 09:11
-
-
Save lochbrunner/d9c77c3fbfd5301154c070561e294ed7 to your computer and use it in GitHub Desktop.
Using pytorch, LSTM, mini-batches and DataSets to train a toy model. This GIST is inspired by https://gist.github.com/williamFalcon/f27c7b90e34b4ba88ced042d9ef33edd but trying to be complete, working and a bit more simpler than the orig. Additionaly it uses torch datasets.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import torch | |
import torch.nn as nn | |
import torch.nn.utils.rnn as rnn | |
import torch | |
import torch.optim as optim | |
from torch.utils.data import Dataset, DataLoader | |
from torch.nn import functional as F | |
import numpy as np | |
def memoize(function): | |
memo = {} | |
def wrapper(*args): | |
if args in memo: | |
return memo[args] | |
else: | |
rv = function(*args) | |
memo[args] = rv | |
return rv | |
return wrapper | |
class Compose(object): | |
'''Composes several transforms together. | |
Transforms on a generic tuple instead of on value | |
as torchvision.transforms.Compose does. | |
''' | |
def __init__(self, transforms): | |
self.transforms = transforms | |
def __call__(self, *args): | |
for t in self.transforms: | |
args = t(*args) | |
return args | |
class BieberDataset(Dataset): | |
def __init__(self, transform=None): | |
self.transform = transform | |
sent_1_x = ['is', 'it', 'too', 'late', 'now', 'say', 'sorry'] | |
sent_1_y = ['VB', 'PRP', 'RB', 'RB', 'RB', 'VB', 'JJ'] | |
sent_2_x = ['ooh', 'ooh'] | |
sent_2_y = ['NNP', 'NNP'] | |
sent_3_x = ['sorry', 'yeah'] | |
sent_3_y = ['JJ', 'NNP'] | |
self.X = [sent_1_x, sent_2_x, sent_3_x] | |
self.Y = [sent_1_y, sent_2_y, sent_3_y] | |
X_lengths = [len(sentence) for sentence in self.X] | |
self.longest_sent = max(X_lengths) | |
self.words_count = np.unique([j for i in self.X for j in i]).size | |
self.tags_count = np.unique([j for i in self.Y for j in i]).size + 1 | |
self.pad_token = torch.tensor(0) # pylint: disable=not-callable | |
@property | |
def max_length(self): | |
return self.longest_sent | |
def __len__(self): | |
return len(self.X) | |
@memoize | |
def __getitem__(self, index): | |
print(f'requesting {index}') | |
return self.transform(self.X[index], self.Y[index], len(self.X[index])) | |
class Embedder: | |
def __init__(self): | |
self.vocab = {'<PAD>': 0, 'is': 1, 'it': 2, 'too': 3, 'late': 4, | |
'now': 5, 'say': 6, 'sorry': 7, 'ooh': 8, 'yeah': 9} | |
self.tags = {'<PAD>': 0, 'VB': 1, 'PRP': 2, 'RB': 3, 'JJ': 4, 'NNP': 5} | |
def __call__(self, x, y, l): | |
y = [self.tags[tag] for tag in y] | |
x = [self.vocab[tag] for tag in x] | |
return x, y, l | |
class Padder: | |
def __init__(self, max_length, pad_token): | |
self.max_length = max_length | |
self.pad_token = pad_token | |
def __call__(self, x, y, l): | |
padded_x = np.ones((self.max_length))*self.pad_token | |
padded_x[0:len(x)] = x | |
padded_y = np.ones((self.max_length))*self.pad_token | |
padded_y[0:len(y)] = y | |
x = torch.as_tensor(padded_x, dtype=torch.long) # pylint: disable=no-member | |
y = torch.as_tensor(padded_y, dtype=torch.long) # pylint: disable=no-member | |
return x, y, l | |
class Uploader: | |
def __init__(self, device): | |
self.device = device | |
def __call__(self, x, y, l): | |
x = x.to(self.device) | |
y = y.to(self.device) | |
return x, y, l | |
class BieberLSTM(nn.Module): | |
def __init__(self, words_size, tag_size, pad_token): | |
super(BieberLSTM, self).__init__() | |
embedding_dim = 8 | |
self.embedding = nn.Embedding( | |
num_embeddings=words_size+1, | |
embedding_dim=embedding_dim, | |
padding_idx=pad_token | |
) | |
lstm_hidden_size = 100 | |
lstm_layers = 5 | |
self.lstm = nn.LSTM( | |
input_size=embedding_dim, | |
hidden_size=lstm_hidden_size, | |
num_layers=lstm_layers, | |
batch_first=True, | |
) | |
lstm_h = torch.empty(lstm_layers, lstm_hidden_size) # pylint: disable=no-member | |
nn.init.uniform_(lstm_h, -1., 1.) | |
self.lstm_h = nn.Parameter(lstm_h) | |
lstm_c = torch.empty(lstm_layers, lstm_hidden_size) # pylint: disable=no-member | |
nn.init.uniform_(lstm_c, -1., 1.) | |
self.lstm_c = nn.Parameter(lstm_c) | |
self.hidden_to_tag = nn.Linear(lstm_hidden_size, tag_size) | |
def forward(self, x, l): | |
batch_size = l.size(0) | |
x = self.embedding(x) | |
# enforce_sorted=False breaks ONNX compatibility | |
x = rnn.pack_padded_sequence(x, l, batch_first=True, enforce_sorted=False) | |
lstm_h = self.lstm_h[:, None].expand(-1, batch_size, -1) | |
lstm_c = self.lstm_c[:, None].expand(-1, batch_size, -1) | |
x, _ = self.lstm(x, (lstm_h, lstm_c)) | |
x, _ = rnn.pad_packed_sequence(x, batch_first=True) | |
x = self.hidden_to_tag(x) | |
x = F.log_softmax(x, dim=2) | |
return x # batch_size x max_length x tag_size | |
use_cuda = torch.cuda.is_available() and False | |
device = torch.device('cuda:0' if use_cuda else 'cpu') # pylint: disable=no-member | |
# Training | |
params = {'batch_size': 3, | |
'shuffle': True, | |
'num_workers': 0, 'pin_memory': True} # Setting > 0 causes problems with CUDA | |
padding_index = 0 | |
dataset = BieberDataset(transform=Compose([ | |
Embedder(), | |
Padder(7, padding_index), | |
Uploader(device) | |
])) | |
dataloader = DataLoader(dataset, **params) | |
model = BieberLSTM(words_size=dataset.words_count, | |
pad_token=dataset.pad_token, | |
tag_size=dataset.tags_count) | |
model.to(device) | |
model.train() | |
loss_function = nn.NLLLoss(ignore_index=dataset.pad_token) # pylint: disable=no-member | |
loss_function.to(device) | |
optimizer = optim.SGD(model.parameters(), lr=0.2) | |
epoch_count = 20 | |
for epoch in range(epoch_count): | |
for x, y, l in dataloader: | |
optimizer.zero_grad() | |
x = model(x, l) | |
# y: minibatch x sequence length x tags | |
# x: minibatch x sequence length | |
# Loss expects | |
# x: minibatch x classes x d_1 | |
# x: minibatch x d_1 | |
loss = loss_function(x.transpose(1, 2), y) | |
print(f'loss: {loss}') | |
loss.backward() | |
optimizer.step() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment