Skip to content

Instantly share code, notes, and snippets.

@nimbus98
Created August 22, 2019 21:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nimbus98/9c23ef7825d1c2ce42058d09f780ce08 to your computer and use it in GitHub Desktop.
Save nimbus98/9c23ef7825d1c2ce42058d09f780ce08 to your computer and use it in GitHub Desktop.
LSTM C-to-C Model for IEEE Blog reference
# Getting Started, first we load our text file and encode the text with integers.
with open('./The Outcasts.txt', 'r') as f:
text = f.read()
characters = tuple(set(text))
int2char = dict(enumerate(characters)) # enumerate gives the characters their integer values
char2int = {char: index for index, char in int2char.items()} # create the dictionary from characters to assigned integers
encoded = np.array([char2int[char] for char in text]) # encode text using character to integer dictionary
# Now, we require an algorithm to make batches that can be passed into our model, hence a batching function is required. Here we also set our targets shifted by one, so we can pass it into our model to process it.
def get_batches(arr, n_seqs, n_characters):
'''
arr: Array to make batches from
n_seqs: number of sequences per batch
n_steps: number of sequence steps per batch
'''
batch_size = n_seqs * n_characters
n_batches = len(arr)//batch_size
# Keep only enough characters to make full batches
arr = arr[:n_batches * batch_size]
# Reshape into n_seqs rows
arr = arr.reshape((n_seqs, -1))
for n in range(0, arr.shape[1], n_characters):
# The features
x = arr[:, n:n+n_characters]
# The targets, shifted by one
y = np.zeros_like(x)
try:
y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+n_characters]
except IndexError:
y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
yield x, y
# Now we will build our model, by making an LSTM cell class to define the cells for both layers in our LSTM model. We'll also initialize the hidden/activation and memory cell states to tensors of zeros to pass to the first LSTM cell in the sequence. For this, we will use the nn module by pyTorch.
# This will also involve the forward propagation.
class CharLSTM(nn.ModuleList):
def __init__(self, sequence_len, vocab_size, hidden_dim, batch_size):
super(CharLSTM, self).__init__()
# init the parameters
self.hidden_dim = hidden_dim
self.batch_size = batch_size
self.sequence_len = sequence_len
self.vocab_size = vocab_size
# first lstm cell
self.lstm_1 = nn.LSTMCell(input_size=vocab_size, hidden_size=hidden_dim)
# second lstm cell
self.lstm_2 = nn.LSTMCell(input_size=hidden_dim, hidden_size=hidden_dim)
# dropout layer for the output of the second lstm cell
self.dropout = nn.Dropout(p=0.5)
# This layer connects the output of the LSTM cell to the output layer, named as fc because it 'fully connects' the LSTM cell to output
self.fc = nn.Linear(in_features=hidden_dim, out_features=vocab_size)
def forward(self, x, hc):
'''
x: input to the model
hc: hidden/activation and memory cell states
'''
# empty tensor for the output
output_seq = torch.empty((self.sequence_len, self.batch_size, self.vocab_size))
# initialize both LSTM cells with zero hidden/activation and memory cell states
hc_1, hc_2 = hc, hc
# for every step in the sequence
for t in range(self.sequence_len):
# get the hidden and cell states from the first cell
hc_1 = self.lstm_1(x[t], hc_1)
# unpack from the first LSTM cell
h_1, c_1 = hc_1
# pass into the second LSTM cell
hc_2 = self.lstm_2(h_1, hc_2)
# unpack from the Second cell
h_2, c_2 = hc_2
# form the output of the fully connected layer
output_seq[t] = self.fc(self.dropout(h_2))
# return the output sequence
return output_seq.view((self.sequence_len * self.batch_size, -1))
def init_hidden(self):
# initialize the hidden state and the cell state to zeros
return (torch.zeros(self.batch_size, self.hidden_dim),torch.zeros(self.batch_size, self.hidden_dim))
# Now that we defined our model, we will move forward with training it on our loaded data, we will also monitor our losses with the help of basic validation sets.
# Here, net will be the model object, and we will decide our optimizer (Adam optimizer is used in this case) and our loss function(Cross entropy loss function is used).
# You may also notice we will be using the contiguous function, it is called in order to make the memory block continuous so the view function can operate (it will not happen if the memory block has holes in it). The memory layout sometimes becomes misaligned so we need to use contiguous at some places to realign the memory.
net = CharLSTM(sequence_len=128, vocab_size=len(char2int), hidden_dim=512, batch_size=128)
# define the loss and the optimizer
optimizer = optim.Adam(net.parameters(), lr=0.001)
lossfunc = nn.CrossEntropyLoss()
val_idx = int(len(encoded) * (1 - 0.1))
data, val_data = encoded[:val_idx], encoded[val_idx:]
# empty list for validation losses
val_losses = list()
# empty list for samples
samples = list()
for epoch in range(10):
hc = net.init_hidden()
for i, (x, y) in enumerate(get_batches(data, 128, 128)):
# get the torch tensors from the one-hot of training data
# also transpose the axis for the training set and the targets
x_train = torch.from_numpy(to_categorical(x, num_classes = net.vocab_size).transpose([1, 0, 2]))
targets = torch.from_numpy(y.T).type(torch.LongTensor) # tensor of the target
# zero out the gradient values
optimizer.zero_grad()
# get the output sequence from the input, activation and memory cell states
output = net(x_train, hc)
# calculate the loss
# we need to calculate the loss across all batches
loss = lossfunc(output, targets.contiguous().view(128*128))
# calculate the gradients
loss.backward()
# update the parameters of the model
optimizer.step()
# feedback every 10 batches
if i % 10 == 0:
# initialize the validation hidden state and cell state
val_h, val_c = net.init_hidden()
for val_x, val_y in get_batches(val_data, 128, 128):
# prepare the validation inputs and targets
val_x = torch.from_numpy(to_categorical(val_x).transpose([1, 0, 2]))
val_y = torch.from_numpy(val_y.T).type(torch.LongTensor).contiguous().view(128*128)
# get the validation output
val_output = net(val_x, (val_h, val_c))
# get the validation loss
val_loss = lossfunc(val_output, val_y)
# append the validation loss
val_losses.append(val_loss.item())
# sample 256 chars
samples.append(''.join([int2char[int_] for int_ in net.predict("A", seq_len=1024)]))
print("Epoch: {}, Batch: {}, Train Loss: {:.6f}, Validation Loss: {:.6f}".format(epoch, i, loss.item(), val_loss.item()))
# Our predict function, given a character, must predict the next character and return it, along with the hidden/activation state.
# Hence, this will give us our output for our model.
def init_hidden_predict(self):
# initialize hidden/activation and memory cell to zeros
# batch size is 1
return (torch.zeros(1, self.hidden_dim), torch.zeros(1, self.hidden_dim))
def predict(self, char, top_k=5, seq_len=128):
self.eval()
# placeholder for the generated text
seq = np.empty(seq_len+1)
seq[0] = char2int[char]
hc = self.init_hidden_predict()
# now we need to encode the character - (1, vocab_size)
# to_categorical is used from the keras library to hot one code
char = to_categorical(char2int[char], num_classes=self.vocab_size)
# add the batch dimension
char = torch.from_numpy(char).unsqueeze(0)
# now we need to pass the character to the first LSTM cell to obtain
# the predictions on the second character
hc_1, hc_2 = hc, hc
for t in range(seq_len):
# get the hidden/activation and memory states from the first and second LSTM cells
hc_1 = self.lstm_1(char, hc_1)
h_1, _ = hc_1
hc_2 = self.lstm_2(h_1, hc_2)
h_2, _ = hc_2
# pass the output of the cell through fully connected layer
h_2 = self.fc(h_2)
# apply the softmax to output to get the probabilities of the characters
h_2 = F.softmax(h_2, dim=1)
# h_2 now holds the vector of predictions (1, vocab_size)
# we want to sample 5 top characters
p, top_char = h_2.topk(top_k)
# get the top k characters by their probabilities
top_char = top_char.squeeze().numpy()
# sample a character using its probability
p = p.detach().squeeze().numpy()
char = np.random.choice(top_char, p = p/p.sum())
# append the character to the output sequence
seq[t+1] = char
# prepare the character to be fed to the next LSTM cell
char = to_categorical(char, num_classes=self.vocab_size)
char = torch.from_numpy(char).unsqueeze(0)
return seq
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment