Created
August 22, 2019 21:38
-
-
Save nimbus98/9c23ef7825d1c2ce42058d09f780ce08 to your computer and use it in GitHub Desktop.
LSTM C-to-C Model for IEEE Blog reference
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Getting Started, first we load our text file and encode the text with integers. | |
with open('./The Outcasts.txt', 'r') as f: | |
text = f.read() | |
characters = tuple(set(text)) | |
int2char = dict(enumerate(characters)) # enumerate gives the characters their integer values | |
char2int = {char: index for index, char in int2char.items()} # create the dictionary from characters to assigned integers | |
encoded = np.array([char2int[char] for char in text]) # encode text using character to integer dictionary | |
# Now, we require an algorithm to make batches that can be passed into our model, hence a batching function is required. Here we also set our targets shifted by one, so we can pass it into our model to process it. | |
def get_batches(arr, n_seqs, n_characters): | |
''' | |
arr: Array to make batches from | |
n_seqs: number of sequences per batch | |
n_steps: number of sequence steps per batch | |
''' | |
batch_size = n_seqs * n_characters | |
n_batches = len(arr)//batch_size | |
# Keep only enough characters to make full batches | |
arr = arr[:n_batches * batch_size] | |
# Reshape into n_seqs rows | |
arr = arr.reshape((n_seqs, -1)) | |
for n in range(0, arr.shape[1], n_characters): | |
# The features | |
x = arr[:, n:n+n_characters] | |
# The targets, shifted by one | |
y = np.zeros_like(x) | |
try: | |
y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+n_characters] | |
except IndexError: | |
y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0] | |
yield x, y | |
# Now we will build our model, by making an LSTM cell class to define the cells for both layers in our LSTM model. We'll also initialize the hidden/activation and memory cell states to tensors of zeros to pass to the first LSTM cell in the sequence. For this, we will use the nn module by pyTorch. | |
# This will also involve the forward propagation. | |
class CharLSTM(nn.ModuleList): | |
def __init__(self, sequence_len, vocab_size, hidden_dim, batch_size): | |
super(CharLSTM, self).__init__() | |
# init the parameters | |
self.hidden_dim = hidden_dim | |
self.batch_size = batch_size | |
self.sequence_len = sequence_len | |
self.vocab_size = vocab_size | |
# first lstm cell | |
self.lstm_1 = nn.LSTMCell(input_size=vocab_size, hidden_size=hidden_dim) | |
# second lstm cell | |
self.lstm_2 = nn.LSTMCell(input_size=hidden_dim, hidden_size=hidden_dim) | |
# dropout layer for the output of the second lstm cell | |
self.dropout = nn.Dropout(p=0.5) | |
# This layer connects the output of the LSTM cell to the output layer, named as fc because it 'fully connects' the LSTM cell to output | |
self.fc = nn.Linear(in_features=hidden_dim, out_features=vocab_size) | |
def forward(self, x, hc): | |
''' | |
x: input to the model | |
hc: hidden/activation and memory cell states | |
''' | |
# empty tensor for the output | |
output_seq = torch.empty((self.sequence_len, self.batch_size, self.vocab_size)) | |
# initialize both LSTM cells with zero hidden/activation and memory cell states | |
hc_1, hc_2 = hc, hc | |
# for every step in the sequence | |
for t in range(self.sequence_len): | |
# get the hidden and cell states from the first cell | |
hc_1 = self.lstm_1(x[t], hc_1) | |
# unpack from the first LSTM cell | |
h_1, c_1 = hc_1 | |
# pass into the second LSTM cell | |
hc_2 = self.lstm_2(h_1, hc_2) | |
# unpack from the Second cell | |
h_2, c_2 = hc_2 | |
# form the output of the fully connected layer | |
output_seq[t] = self.fc(self.dropout(h_2)) | |
# return the output sequence | |
return output_seq.view((self.sequence_len * self.batch_size, -1)) | |
def init_hidden(self): | |
# initialize the hidden state and the cell state to zeros | |
return (torch.zeros(self.batch_size, self.hidden_dim),torch.zeros(self.batch_size, self.hidden_dim)) | |
# Now that we defined our model, we will move forward with training it on our loaded data, we will also monitor our losses with the help of basic validation sets. | |
# Here, net will be the model object, and we will decide our optimizer (Adam optimizer is used in this case) and our loss function(Cross entropy loss function is used). | |
# You may also notice we will be using the contiguous function, it is called in order to make the memory block continuous so the view function can operate (it will not happen if the memory block has holes in it). The memory layout sometimes becomes misaligned so we need to use contiguous at some places to realign the memory. | |
net = CharLSTM(sequence_len=128, vocab_size=len(char2int), hidden_dim=512, batch_size=128) | |
# define the loss and the optimizer | |
optimizer = optim.Adam(net.parameters(), lr=0.001) | |
lossfunc = nn.CrossEntropyLoss() | |
val_idx = int(len(encoded) * (1 - 0.1)) | |
data, val_data = encoded[:val_idx], encoded[val_idx:] | |
# empty list for validation losses | |
val_losses = list() | |
# empty list for samples | |
samples = list() | |
for epoch in range(10): | |
hc = net.init_hidden() | |
for i, (x, y) in enumerate(get_batches(data, 128, 128)): | |
# get the torch tensors from the one-hot of training data | |
# also transpose the axis for the training set and the targets | |
x_train = torch.from_numpy(to_categorical(x, num_classes = net.vocab_size).transpose([1, 0, 2])) | |
targets = torch.from_numpy(y.T).type(torch.LongTensor) # tensor of the target | |
# zero out the gradient values | |
optimizer.zero_grad() | |
# get the output sequence from the input, activation and memory cell states | |
output = net(x_train, hc) | |
# calculate the loss | |
# we need to calculate the loss across all batches | |
loss = lossfunc(output, targets.contiguous().view(128*128)) | |
# calculate the gradients | |
loss.backward() | |
# update the parameters of the model | |
optimizer.step() | |
# feedback every 10 batches | |
if i % 10 == 0: | |
# initialize the validation hidden state and cell state | |
val_h, val_c = net.init_hidden() | |
for val_x, val_y in get_batches(val_data, 128, 128): | |
# prepare the validation inputs and targets | |
val_x = torch.from_numpy(to_categorical(val_x).transpose([1, 0, 2])) | |
val_y = torch.from_numpy(val_y.T).type(torch.LongTensor).contiguous().view(128*128) | |
# get the validation output | |
val_output = net(val_x, (val_h, val_c)) | |
# get the validation loss | |
val_loss = lossfunc(val_output, val_y) | |
# append the validation loss | |
val_losses.append(val_loss.item()) | |
# sample 256 chars | |
samples.append(''.join([int2char[int_] for int_ in net.predict("A", seq_len=1024)])) | |
print("Epoch: {}, Batch: {}, Train Loss: {:.6f}, Validation Loss: {:.6f}".format(epoch, i, loss.item(), val_loss.item())) | |
# Our predict function, given a character, must predict the next character and return it, along with the hidden/activation state. | |
# Hence, this will give us our output for our model. | |
def init_hidden_predict(self): | |
# initialize hidden/activation and memory cell to zeros | |
# batch size is 1 | |
return (torch.zeros(1, self.hidden_dim), torch.zeros(1, self.hidden_dim)) | |
def predict(self, char, top_k=5, seq_len=128): | |
self.eval() | |
# placeholder for the generated text | |
seq = np.empty(seq_len+1) | |
seq[0] = char2int[char] | |
hc = self.init_hidden_predict() | |
# now we need to encode the character - (1, vocab_size) | |
# to_categorical is used from the keras library to hot one code | |
char = to_categorical(char2int[char], num_classes=self.vocab_size) | |
# add the batch dimension | |
char = torch.from_numpy(char).unsqueeze(0) | |
# now we need to pass the character to the first LSTM cell to obtain | |
# the predictions on the second character | |
hc_1, hc_2 = hc, hc | |
for t in range(seq_len): | |
# get the hidden/activation and memory states from the first and second LSTM cells | |
hc_1 = self.lstm_1(char, hc_1) | |
h_1, _ = hc_1 | |
hc_2 = self.lstm_2(h_1, hc_2) | |
h_2, _ = hc_2 | |
# pass the output of the cell through fully connected layer | |
h_2 = self.fc(h_2) | |
# apply the softmax to output to get the probabilities of the characters | |
h_2 = F.softmax(h_2, dim=1) | |
# h_2 now holds the vector of predictions (1, vocab_size) | |
# we want to sample 5 top characters | |
p, top_char = h_2.topk(top_k) | |
# get the top k characters by their probabilities | |
top_char = top_char.squeeze().numpy() | |
# sample a character using its probability | |
p = p.detach().squeeze().numpy() | |
char = np.random.choice(top_char, p = p/p.sum()) | |
# append the character to the output sequence | |
seq[t+1] = char | |
# prepare the character to be fed to the next LSTM cell | |
char = to_categorical(char, num_classes=self.vocab_size) | |
char = torch.from_numpy(char).unsqueeze(0) | |
return seq |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment