Created
July 2, 2018 14:34
-
-
Save ironflood/3435d83bb831a7e2d6fc6a78e4a018c4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" Experimental implementation of a contrastive siamese network made of HAN """ | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import numpy as np | |
from torch.autograd import Variable | |
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence | |
from .data import CharDocumentEncoder | |
# ------------------------------------------------------------------------------ | |
# Experimental character HAN | |
# ------------------------------------------------------------------------------ | |
class CharToWord(nn.Module): | |
""" | |
The character to word-level module. | |
""" | |
def __init__(self, num_chars, char_emb_size, word_hidden_size, context_vector_size, dropout_p=0.0, projection_nonlinearity=nn.Tanh, rnn=nn.GRU, use_gpu=True): | |
super(CharToWord, self).__init__() | |
self.use_gpu = use_gpu | |
# Character embeddings | |
self.char_embeddings = nn.Embedding(num_chars, char_emb_size) | |
# Dropout applied to the embeddings | |
self.dropout = nn.Dropout(p=dropout_p) | |
# Bidirectional RNN | |
# Inputs: character embeddings | |
# Outputs: word vector (word_hidden_size * 2) | |
self.char_to_word = rnn(char_emb_size, word_hidden_size, bidirectional=True, batch_first=True) | |
# Learnable context vector | |
self.word_context = nn.Parameter(torch.Tensor(context_vector_size, 1).uniform_(-0.1, 0.1)) | |
# Projects the word vectors to new space to be multiplied by the context vector | |
self.word_projection = nn.Linear(word_hidden_size * 2, context_vector_size) | |
# The nonlinearity to apply to the projections prior to multiplication | |
# by context vector | |
self.word_proj_nonlinearity = projection_nonlinearity() | |
# Softmax layer to convert attention * projection into weights | |
self.softmax = nn.Softmax() | |
def _sort_char_tensor(self, padded_tensor, sequence_lens): | |
''' | |
"Packing" of the character indices prior to embedding requires that they | |
be in descending order of length to work. | |
Returns the sorted tensor, the sequence lengths, and the indices for | |
inverting the order. | |
''' | |
sequence_lens, order = sequence_lens.sort(0, descending=True) | |
padded_tensor = padded_tensor[order] | |
return padded_tensor, sequence_lens, order | |
def forward(self, padded_char_tensor, sequence_lens): | |
char_sorted, sequence_lens, order = self._sort_char_tensor(padded_char_tensor, sequence_lens) | |
# embed | |
char_embed = self.char_embeddings(char_sorted) | |
# apply dropout to the embeddings | |
char_embed = self.dropout(char_embed) | |
# pack the sequences for efficiency | |
# @todo seems inefficient to cpu() | |
packed = pack_padded_sequence(char_embed, sequence_lens.cpu().numpy(), batch_first=True) | |
# run through the bidirectional GRU | |
output, (hidden, cell) = self.char_to_word(packed) | |
# unpack the sequence | |
output, _ = pad_packed_sequence(output, batch_first=True) | |
# revert to the original ordering | |
output = output[order, :, :] | |
# prepare final word tensor: | |
word_tensor = Variable(torch.zeros((output.size(0), output.size(2)))) | |
if self.use_gpu: | |
word_tensor = word_tensor.cuda() | |
# calculate and apply attention | |
for word_ind in range(output.size(0)): | |
# create the projection of the word representation | |
projection = self.word_projection(output[word_ind]) | |
projection = self.word_proj_nonlinearity(projection) | |
# compute "similarity" weighting via the word context vector | |
attention = torch.mm(projection, self.word_context) | |
attention = self.softmax(attention) | |
# multiply the word vectors by their calculated attention weight | |
word_tensor[word_ind, :] = output[word_ind].transpose(1, 0).mv(attention.view(-1)) | |
# return the word vector reps: | |
return word_tensor | |
# ------------------------------------------------------------------------------ | |
# Attention modules for another HAN implementation | |
# ------------------------------------------------------------------------------ | |
class AttentionModule(nn.Module): | |
def __init__(self, input_dim, output_dim): | |
super(AttentionModule, self).__init__() | |
self.mlp = nn.Sequential( | |
nn.Linear(output_dim, output_dim), | |
nn.Tanh()) | |
self.context_vector = nn.Parameter(torch.Tensor(output_dim)) | |
def forward(self, input): | |
output = self.mlp(input) | |
attn_weight = F.softmax(output.matmul(self.context_vector), dim=0) | |
attended_output = attn_weight.matmul(output) | |
return attended_output | |
def pack(sequences, seqlens, use_gpu): | |
assert(np.count_nonzero(seqlens) == seqlens.shape[0]) | |
# pad inputs | |
padded_inputs = Variable(torch.zeros(seqlens.shape[0], int(seqlens.max()), sequences.size(1))) | |
if use_gpu: | |
padded_inputs = padded_inputs.cuda() | |
begin = 0 | |
for idx, length in enumerate(seqlens): | |
padded_inputs[idx, :length] = sequences[begin:begin + length] | |
begin += length | |
indices = np.argsort(-seqlens) | |
seqlens = seqlens[indices] | |
if use_gpu: | |
padded_inputs = padded_inputs[torch.LongTensor(indices).cuda()] | |
else: | |
padded_inputs = padded_inputs[torch.LongTensor(indices)] | |
packed_input = pack_padded_sequence(padded_inputs, seqlens, batch_first=True) | |
return packed_input, indices | |
class AttendedSeqEmbedding(nn.Module): | |
def __init__(self, input_dim=100, hidden_dim=50, output_dim=100, rnn_type='gru', use_gpu=False, batch_first=True): | |
super(AttendedSeqEmbedding, self).__init__() | |
self.use_gpu = use_gpu | |
self.rnn_type = rnn_type | |
if rnn_type == 'lstm': | |
self.rnn = nn.LSTM(input_size=input_dim, hidden_size=hidden_dim, bidirectional=True, batch_first=batch_first) | |
else: | |
self.rnn = nn.GRU(input_size=input_dim, hidden_size=hidden_dim, bidirectional=True, batch_first=batch_first) | |
self.batch_first = batch_first | |
self.input_dim = input_dim | |
self.rnn_type = rnn_type | |
rnn_output_dim = 2 * hidden_dim | |
self.mlp = nn.Sequential( | |
nn.Linear(rnn_output_dim, output_dim), | |
nn.Tanh()) | |
self.context_vector = nn.Parameter(torch.Tensor(output_dim)) | |
self.context_vector.data.normal_(0, 0.1) | |
def forward(self, sequences, seqlens): | |
packed_input, indices = pack(sequences, seqlens, self.use_gpu) | |
# self.rnn.flatten_parameters() | |
packed_output, _ = self.rnn(packed_input) | |
padded_outputs, sorted_seqlens = pad_packed_sequence(packed_output, batch_first=self.batch_first) | |
# undo sort | |
# padded_outputs = padded_outputs[torch.LongTensor(orig_indices).cuda()] | |
# apply attention | |
# mlp_input = torch.cat([padded_outputs[i, :seqlens[i]] for i in range(len(seqlens))], dim=0) | |
# mlp_output = self.mlp(mlp_input) | |
mlp_output = self.mlp(padded_outputs) | |
attn_weight = mlp_output.matmul(self.context_vector) | |
# end = np.cumsum(seqlens) | |
attended_outputs = torch.stack([F.softmax(attn_weight[i, :length], dim=0).matmul(padded_outputs[i, :length]) for i, length in enumerate(sorted_seqlens)], dim=0) | |
# undo sort | |
orig_indices = [0] * indices.shape[0] | |
for i in range(indices.shape[0]): | |
orig_indices[indices[i]] = i | |
if self.use_gpu: | |
attended_outputs = attended_outputs[torch.LongTensor(orig_indices).cuda()] | |
else: | |
attended_outputs = attended_outputs[torch.LongTensor(orig_indices)] | |
return attended_outputs | |
# ------------------------------------------------------------------------------ | |
# HAN (Hierarchical Attention Net) | |
# ------------------------------------------------------------------------------ | |
class HAN(nn.Module): | |
def __version__(self): | |
return '2.1.0' | |
def __init__(self, | |
word_emb_dim=128, | |
rnn_hidden_dim=64, | |
emb_dim=128, | |
rnn_type='gru', | |
use_emb=False, | |
use_gpu=False): | |
super(HAN, self).__init__() | |
self.use_gpu = use_gpu | |
self.word_emb_dim = word_emb_dim | |
self.use_emb = use_emb | |
self.emb_dim = emb_dim | |
self.rnn_type = rnn_type | |
self.char_encoder = CharDocumentEncoder('data/search/valid_chars', use_gpu=self.use_gpu) | |
# character to word embedding | |
self.word_embedding = CharToWord(num_chars=self.char_encoder.onhotlength, | |
char_emb_size=16, | |
word_hidden_size=64, | |
context_vector_size=64, | |
dropout_p=0.2, | |
projection_nonlinearity=nn.Tanh, | |
rnn=nn.GRU, | |
use_gpu=use_gpu) | |
# word to sentence embedding | |
self.document_sent_emb = AttendedSeqEmbedding(input_dim=word_emb_dim, | |
hidden_dim=rnn_hidden_dim, | |
output_dim=emb_dim, | |
rnn_type=rnn_type, | |
use_gpu=use_gpu) | |
# sentence to document embedding | |
self.document_emb = AttendedSeqEmbedding(input_dim=emb_dim, | |
hidden_dim=rnn_hidden_dim, | |
output_dim=emb_dim, | |
rnn_type=rnn_type, | |
use_gpu=use_gpu) | |
self.ffw = nn.Sequential( | |
nn.Linear(self.emb_dim, 128), | |
nn.Tanh(), | |
nn.Linear(128, 64), | |
nn.Tanh() | |
) | |
self.empty_document_emb = nn.Parameter(torch.Tensor(emb_dim)) | |
self.empty_document_emb.data.normal_(0, 0.1) | |
def embed(self, sequences, seqlens, seq_emb, empty_seq_emb): | |
nonempty = (seqlens != 0).nonzero() | |
nonempty_seq_embs = seq_emb(sequences, seqlens[nonempty]) | |
if np.count_nonzero(seqlens != 0) < len(seqlens): | |
seq_embs = Variable(torch.Tensor(len(seqlens), self.emb_dim)) | |
if self.use_gpu: | |
seq_embs = seq_embs.cuda() | |
# Pdb().set_trace() | |
seq_embs[nonempty] = nonempty_seq_embs | |
empty = (seqlens == 0).nonzero() | |
seq_embs[empty] = empty_seq_emb.expand(len(empty[0]), self.emb_dim) | |
else: | |
seq_embs = nonempty_seq_embs | |
return seq_embs | |
def compute_document_embs(self, documents): | |
# we are supposed to have a batch[ doc[ sentences[word] ] ] | |
stacked_encoded_words = [] | |
sentlens = [] # sentlens is a list of list of #words in each sentence in each document | |
for doc in documents: | |
for sent in doc: | |
sentlens.append(len(sent) + 1) # each sentence has a last timestep for end, done in encoding step before | |
stacked_encoded_words = stacked_encoded_words + self.char_encoder.encode_sentence(sent) | |
sentlens = np.array(sentlens) | |
# now we have a flattened repres of all words and end of sentences | |
seq_tensor, sequence_lens = self.char_encoder.pad_char_seq_inds(stacked_encoded_words) | |
# use the first embedding group to embed onhot characters to words | |
words = self.word_embedding(seq_tensor, sequence_lens) | |
if self.use_gpu: | |
words = words.cuda() | |
document_word_embs = words | |
document_sent_embs = self.document_sent_emb(document_word_embs, sentlens) | |
# documentlens is a list of #sentences in each document | |
documentlens = np.array([len(document) for document in documents]) | |
document_embs = self.embed(document_sent_embs, documentlens, self.document_emb, self.empty_document_emb) | |
return document_embs | |
def forward(self, documents): | |
# forward pass with mono-branch only | |
document_embs = self.compute_document_embs(documents) | |
return self.ffw(document_embs) | |
# ------------------------------------------------------------------------------ | |
# SIAMESE Network | |
# ------------------------------------------------------------------------------ | |
class SiameseNetwork(nn.Module): | |
def __init__(self, args): | |
super(SiameseNetwork, self).__init__() | |
# Store config | |
self.args = args | |
self.use_cuda = args.cuda | |
self.input_dim = 128 | |
self.emb_dim = 128 | |
# includes the hierarchical attention network (for both question & document) | |
self.han = HAN( | |
word_emb_dim=self.input_dim, | |
rnn_hidden_dim=64, | |
emb_dim=self.emb_dim, | |
rnn_type='gru', | |
use_emb=False, | |
use_gpu=self.use_cuda) | |
def forward(self, questions, documents): | |
# distance_input = (documents - questions).pow(2).sum(1) # squared distances | |
question_embed= self.han(questions) | |
document_embed = self.han(documents) | |
return question_embed, document_embed | |
# same as forward, but for clarity at prediction time | |
def get_embedding(self, questions, documents): | |
question_embed= self.han(questions) | |
document_embed = self.han(documents) | |
return question_embed, document_embed |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment