Last active June 7, 2017 22:50
Pytorch char rnn as a script, based on examples from Kyle McDonald, Laurent Dinh, and Sean Robertson
# Special thanks to Kyle McDonald, this is based on his example
# Thanks to Laurent Dinh for examples of parameter saving/loading in PyTorch
# Thanks to Sean Robertson for
from torch.autograd import Variable
import torch.nn as nn
import torch
import numpy as np
import time
import math
import os
import argparse
parser = argparse.ArgumentParser(description="PyTorch char-rnn")
parser.add_argument("--mode", "-m", type=int, default=0,
help="0 is evaluate only, 1 is train")
args = parser.parse_args()
use_cuda = torch.cuda.is_available()
# try to get deterministic runs
random_state = np.random.RandomState(1999)
# from
seq_length = 50
minibatch_size = 50
hidden_size = 128
epoch_count = 10
n_layers = 2
lr = 2e-3
input_filename = "tiny-shakespeare.txt"
with open(input_filename, "r") as f:
text =
param_path = "params.npz"
final_param_path = "params_final.npz"
chars = set(text)
chars_len = len(chars)
char_to_index = {}
index_to_char = {}
for i, c in enumerate(chars):
char_to_index[c] = i
index_to_char[i] = c
def time_since(since):
s = time.time() - since
m = math.floor(s / 60)
s -= m * 60
return "%dm %ds" % (m, s)
def chunks(l, n):
#print(list(chunks(range(11), 3)))
for i in range(0, len(l) - n, n):
yield l[i:i + n]
def index_to_tensor(index):
tensor = torch.zeros(1, 1).long()
tensor[0,0] = index
return Variable(tensor)
# convert all characters to indices
batches = [char_to_index[char] for char in text]
# chunk into sequences of length seq_length + 1
batches = list(chunks(batches, seq_length + 1))
# chunk sequences into batches
batches = list(chunks(batches, minibatch_size))
# convert batches to tensors and transpose
batches = [torch.LongTensor(batch).transpose_(0, 1) for batch in batches]
# each batch is (sequence_length + 1) x batch_size
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, n_layers, batch_size):
super(RNN, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.n_layers = n_layers
self.minibatch_size = minibatch_size
self.encoder = nn.Embedding(input_size, hidden_size)
self.cells = nn.GRU(hidden_size, hidden_size, n_layers)
self.decoder = nn.Linear(hidden_size, output_size)
def forward(self, input, hidden):
input = self.encoder(input)
output, hidden = self.cells(input, hidden)
output = self.decoder(output.view(output.size(0) * output.size(1), output.size(2)))
return output, hidden
def create_hidden(self):
# should this be small random instead of zeros
# should this also be stored in the class rather than being passed around?
return torch.zeros(self.n_layers, self.minibatch_size, self.hidden_size)
print_every = 1
model = RNN(chars_len, hidden_size, chars_len, n_layers, minibatch_size)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
loss_function = nn.CrossEntropyLoss()
hidden = Variable(model.create_hidden())
if use_cuda:
model = model.cuda()
hidden = hidden.cuda()
def train():
if os.path.exists(param_path):
print("Parameters found at {}... loading".format(param_path))
params_val = np.load(param_path)
for key_, param in model.named_parameters(): = torch.Tensor(params_val[key_])
start = time.time()
all_losses = []
format_string = \
Duration: {duration}
Epoch: {epoch}/{epoch_count}
Batch: {batch}/{batch_count}, {batch_rate:.2f}/s
Loss: {loss:.2f}
for epoch in range(1, epoch_count + 1):
d = {key_: for (key_, val_) in model.named_parameters()}
with open(param_path, "w") as f:
np.savez(f, **d)
for batch, batch_tensor in enumerate(batches):
if use_cuda:
batch_tensor = batch_tensor.cuda()
# reset the model
# everything except the last
input_variable = Variable(batch_tensor[:-1])
# everything except the first, flattened
target_variable = Variable(batch_tensor[1:].contiguous().view(-1))
# prediction and calculate loss
output, _ = model(input_variable, hidden)
loss = loss_function(output, target_variable)
# backprop and optimize
loss =[0]
if print_every > 0 and batch % print_every == 0:
batch_count = len(batches)
batch_rate = ((batch_count * (epoch - 1)) + batch) / (time.time() - start)
except KeyboardInterrupt:
# final save
d = {key_: for (key_, val_) in model.named_parameters()}
with open(final_param_path, "w") as f:
np.savez(f, **d)
def evaluate(prime_str='A', predict_len=100, temperature=0.8):
if os.path.exists(final_param_path):
print("Final parameters found at {}... loading".format(final_param_path))
params_val = np.load(final_param_path)
for key_, param in model.named_parameters(): = torch.Tensor(params_val[key_])
raise ValueError("Training was not finalized, no file found at {}. Run with -m 1 first to train a model".format(final_param_path))
model.minibatch_size = 1
hidden = Variable(model.create_hidden(), volatile=True)
if use_cuda:
hidden = hidden.cuda()
prime_tensors = [index_to_tensor(char_to_index[char]) for char in prime_str]
if use_cuda:
prime_tensors = [tensor.cuda() for tensor in prime_tensors]
for prime_tensor in prime_tensors[-2:]:
_, hidden = model(prime_tensor, hidden)
inp = prime_tensors[-1]
predicted = prime_str
for p in range(predict_len):
if use_cuda:
inp = inp.cuda()
output, hidden = model(inp, hidden)
# Sample from the network as a multinomial distribution
output_dist =
# use numpy - torch output non-deterministic even with seeds
def rn(x):
return x / (np.sum(x) + .0001 * np.sum(x))
s = random_state.multinomial(1, rn(output_dist.numpy()))
top_i = int(np.where(s == 1)[0])
# not deterministic even with seed set
#top_i = torch.multinomial(output_dist, 1)[0]
# Add predicted character to string and use as next input
predicted_char = index_to_char[top_i]
predicted += predicted_char
inp = index_to_tensor(char_to_index[predicted_char])
return predicted
if args.mode == 0:
print(evaluate('Th', 500, temperature=0.8))
from IPython import embed; embed(); raise ValueError()
elif args.mode == 1:
