Skip to content

Instantly share code, notes, and snippets.

@lochbrunner
Last active September 4, 2022 09:11
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lochbrunner/d9c77c3fbfd5301154c070561e294ed7 to your computer and use it in GitHub Desktop.
Save lochbrunner/d9c77c3fbfd5301154c070561e294ed7 to your computer and use it in GitHub Desktop.
Using pytorch, LSTM, mini-batches and DataSets to train a toy model. This GIST is inspired by https://gist.github.com/williamFalcon/f27c7b90e34b4ba88ced042d9ef33edd but trying to be complete, working and a bit more simpler than the orig. Additionaly it uses torch datasets.
#!/usr/bin/env python
import torch
import torch.nn as nn
import torch.nn.utils.rnn as rnn
import torch
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn import functional as F
import numpy as np
def memoize(function):
memo = {}
def wrapper(*args):
if args in memo:
return memo[args]
else:
rv = function(*args)
memo[args] = rv
return rv
return wrapper
class Compose(object):
'''Composes several transforms together.
Transforms on a generic tuple instead of on value
as torchvision.transforms.Compose does.
'''
def __init__(self, transforms):
self.transforms = transforms
def __call__(self, *args):
for t in self.transforms:
args = t(*args)
return args
class BieberDataset(Dataset):
def __init__(self, transform=None):
self.transform = transform
sent_1_x = ['is', 'it', 'too', 'late', 'now', 'say', 'sorry']
sent_1_y = ['VB', 'PRP', 'RB', 'RB', 'RB', 'VB', 'JJ']
sent_2_x = ['ooh', 'ooh']
sent_2_y = ['NNP', 'NNP']
sent_3_x = ['sorry', 'yeah']
sent_3_y = ['JJ', 'NNP']
self.X = [sent_1_x, sent_2_x, sent_3_x]
self.Y = [sent_1_y, sent_2_y, sent_3_y]
X_lengths = [len(sentence) for sentence in self.X]
self.longest_sent = max(X_lengths)
self.words_count = np.unique([j for i in self.X for j in i]).size
self.tags_count = np.unique([j for i in self.Y for j in i]).size + 1
self.pad_token = torch.tensor(0) # pylint: disable=not-callable
@property
def max_length(self):
return self.longest_sent
def __len__(self):
return len(self.X)
@memoize
def __getitem__(self, index):
print(f'requesting {index}')
return self.transform(self.X[index], self.Y[index], len(self.X[index]))
class Embedder:
def __init__(self):
self.vocab = {'<PAD>': 0, 'is': 1, 'it': 2, 'too': 3, 'late': 4,
'now': 5, 'say': 6, 'sorry': 7, 'ooh': 8, 'yeah': 9}
self.tags = {'<PAD>': 0, 'VB': 1, 'PRP': 2, 'RB': 3, 'JJ': 4, 'NNP': 5}
def __call__(self, x, y, l):
y = [self.tags[tag] for tag in y]
x = [self.vocab[tag] for tag in x]
return x, y, l
class Padder:
def __init__(self, max_length, pad_token):
self.max_length = max_length
self.pad_token = pad_token
def __call__(self, x, y, l):
padded_x = np.ones((self.max_length))*self.pad_token
padded_x[0:len(x)] = x
padded_y = np.ones((self.max_length))*self.pad_token
padded_y[0:len(y)] = y
x = torch.as_tensor(padded_x, dtype=torch.long) # pylint: disable=no-member
y = torch.as_tensor(padded_y, dtype=torch.long) # pylint: disable=no-member
return x, y, l
class Uploader:
def __init__(self, device):
self.device = device
def __call__(self, x, y, l):
x = x.to(self.device)
y = y.to(self.device)
return x, y, l
class BieberLSTM(nn.Module):
def __init__(self, words_size, tag_size, pad_token):
super(BieberLSTM, self).__init__()
embedding_dim = 8
self.embedding = nn.Embedding(
num_embeddings=words_size+1,
embedding_dim=embedding_dim,
padding_idx=pad_token
)
lstm_hidden_size = 100
lstm_layers = 5
self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=lstm_hidden_size,
num_layers=lstm_layers,
batch_first=True,
)
lstm_h = torch.empty(lstm_layers, lstm_hidden_size) # pylint: disable=no-member
nn.init.uniform_(lstm_h, -1., 1.)
self.lstm_h = nn.Parameter(lstm_h)
lstm_c = torch.empty(lstm_layers, lstm_hidden_size) # pylint: disable=no-member
nn.init.uniform_(lstm_c, -1., 1.)
self.lstm_c = nn.Parameter(lstm_c)
self.hidden_to_tag = nn.Linear(lstm_hidden_size, tag_size)
def forward(self, x, l):
batch_size = l.size(0)
x = self.embedding(x)
# enforce_sorted=False breaks ONNX compatibility
x = rnn.pack_padded_sequence(x, l, batch_first=True, enforce_sorted=False)
lstm_h = self.lstm_h[:, None].expand(-1, batch_size, -1)
lstm_c = self.lstm_c[:, None].expand(-1, batch_size, -1)
x, _ = self.lstm(x, (lstm_h, lstm_c))
x, _ = rnn.pad_packed_sequence(x, batch_first=True)
x = self.hidden_to_tag(x)
x = F.log_softmax(x, dim=2)
return x # batch_size x max_length x tag_size
use_cuda = torch.cuda.is_available() and False
device = torch.device('cuda:0' if use_cuda else 'cpu') # pylint: disable=no-member
# Training
params = {'batch_size': 3,
'shuffle': True,
'num_workers': 0, 'pin_memory': True} # Setting > 0 causes problems with CUDA
padding_index = 0
dataset = BieberDataset(transform=Compose([
Embedder(),
Padder(7, padding_index),
Uploader(device)
]))
dataloader = DataLoader(dataset, **params)
model = BieberLSTM(words_size=dataset.words_count,
pad_token=dataset.pad_token,
tag_size=dataset.tags_count)
model.to(device)
model.train()
loss_function = nn.NLLLoss(ignore_index=dataset.pad_token) # pylint: disable=no-member
loss_function.to(device)
optimizer = optim.SGD(model.parameters(), lr=0.2)
epoch_count = 20
for epoch in range(epoch_count):
for x, y, l in dataloader:
optimizer.zero_grad()
x = model(x, l)
# y: minibatch x sequence length x tags
# x: minibatch x sequence length
# Loss expects
# x: minibatch x classes x d_1
# x: minibatch x d_1
loss = loss_function(x.transpose(1, 2), y)
print(f'loss: {loss}')
loss.backward()
optimizer.step()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment