Skip to content

Instantly share code, notes, and snippets.

@ironflood
Created July 2, 2018 14:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ironflood/3435d83bb831a7e2d6fc6a78e4a018c4 to your computer and use it in GitHub Desktop.
Save ironflood/3435d83bb831a7e2d6fc6a78e4a018c4 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
""" Experimental implementation of a contrastive siamese network made of HAN """
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.autograd import Variable
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from .data import CharDocumentEncoder
# ------------------------------------------------------------------------------
# Experimental character HAN
# ------------------------------------------------------------------------------
class CharToWord(nn.Module):
"""
The character to word-level module.
"""
def __init__(self, num_chars, char_emb_size, word_hidden_size, context_vector_size, dropout_p=0.0, projection_nonlinearity=nn.Tanh, rnn=nn.GRU, use_gpu=True):
super(CharToWord, self).__init__()
self.use_gpu = use_gpu
# Character embeddings
self.char_embeddings = nn.Embedding(num_chars, char_emb_size)
# Dropout applied to the embeddings
self.dropout = nn.Dropout(p=dropout_p)
# Bidirectional RNN
# Inputs: character embeddings
# Outputs: word vector (word_hidden_size * 2)
self.char_to_word = rnn(char_emb_size, word_hidden_size, bidirectional=True, batch_first=True)
# Learnable context vector
self.word_context = nn.Parameter(torch.Tensor(context_vector_size, 1).uniform_(-0.1, 0.1))
# Projects the word vectors to new space to be multiplied by the context vector
self.word_projection = nn.Linear(word_hidden_size * 2, context_vector_size)
# The nonlinearity to apply to the projections prior to multiplication
# by context vector
self.word_proj_nonlinearity = projection_nonlinearity()
# Softmax layer to convert attention * projection into weights
self.softmax = nn.Softmax()
def _sort_char_tensor(self, padded_tensor, sequence_lens):
'''
"Packing" of the character indices prior to embedding requires that they
be in descending order of length to work.
Returns the sorted tensor, the sequence lengths, and the indices for
inverting the order.
'''
sequence_lens, order = sequence_lens.sort(0, descending=True)
padded_tensor = padded_tensor[order]
return padded_tensor, sequence_lens, order
def forward(self, padded_char_tensor, sequence_lens):
char_sorted, sequence_lens, order = self._sort_char_tensor(padded_char_tensor, sequence_lens)
# embed
char_embed = self.char_embeddings(char_sorted)
# apply dropout to the embeddings
char_embed = self.dropout(char_embed)
# pack the sequences for efficiency
# @todo seems inefficient to cpu()
packed = pack_padded_sequence(char_embed, sequence_lens.cpu().numpy(), batch_first=True)
# run through the bidirectional GRU
output, (hidden, cell) = self.char_to_word(packed)
# unpack the sequence
output, _ = pad_packed_sequence(output, batch_first=True)
# revert to the original ordering
output = output[order, :, :]
# prepare final word tensor:
word_tensor = Variable(torch.zeros((output.size(0), output.size(2))))
if self.use_gpu:
word_tensor = word_tensor.cuda()
# calculate and apply attention
for word_ind in range(output.size(0)):
# create the projection of the word representation
projection = self.word_projection(output[word_ind])
projection = self.word_proj_nonlinearity(projection)
# compute "similarity" weighting via the word context vector
attention = torch.mm(projection, self.word_context)
attention = self.softmax(attention)
# multiply the word vectors by their calculated attention weight
word_tensor[word_ind, :] = output[word_ind].transpose(1, 0).mv(attention.view(-1))
# return the word vector reps:
return word_tensor
# ------------------------------------------------------------------------------
# Attention modules for another HAN implementation
# ------------------------------------------------------------------------------
class AttentionModule(nn.Module):
def __init__(self, input_dim, output_dim):
super(AttentionModule, self).__init__()
self.mlp = nn.Sequential(
nn.Linear(output_dim, output_dim),
nn.Tanh())
self.context_vector = nn.Parameter(torch.Tensor(output_dim))
def forward(self, input):
output = self.mlp(input)
attn_weight = F.softmax(output.matmul(self.context_vector), dim=0)
attended_output = attn_weight.matmul(output)
return attended_output
def pack(sequences, seqlens, use_gpu):
assert(np.count_nonzero(seqlens) == seqlens.shape[0])
# pad inputs
padded_inputs = Variable(torch.zeros(seqlens.shape[0], int(seqlens.max()), sequences.size(1)))
if use_gpu:
padded_inputs = padded_inputs.cuda()
begin = 0
for idx, length in enumerate(seqlens):
padded_inputs[idx, :length] = sequences[begin:begin + length]
begin += length
indices = np.argsort(-seqlens)
seqlens = seqlens[indices]
if use_gpu:
padded_inputs = padded_inputs[torch.LongTensor(indices).cuda()]
else:
padded_inputs = padded_inputs[torch.LongTensor(indices)]
packed_input = pack_padded_sequence(padded_inputs, seqlens, batch_first=True)
return packed_input, indices
class AttendedSeqEmbedding(nn.Module):
def __init__(self, input_dim=100, hidden_dim=50, output_dim=100, rnn_type='gru', use_gpu=False, batch_first=True):
super(AttendedSeqEmbedding, self).__init__()
self.use_gpu = use_gpu
self.rnn_type = rnn_type
if rnn_type == 'lstm':
self.rnn = nn.LSTM(input_size=input_dim, hidden_size=hidden_dim, bidirectional=True, batch_first=batch_first)
else:
self.rnn = nn.GRU(input_size=input_dim, hidden_size=hidden_dim, bidirectional=True, batch_first=batch_first)
self.batch_first = batch_first
self.input_dim = input_dim
self.rnn_type = rnn_type
rnn_output_dim = 2 * hidden_dim
self.mlp = nn.Sequential(
nn.Linear(rnn_output_dim, output_dim),
nn.Tanh())
self.context_vector = nn.Parameter(torch.Tensor(output_dim))
self.context_vector.data.normal_(0, 0.1)
def forward(self, sequences, seqlens):
packed_input, indices = pack(sequences, seqlens, self.use_gpu)
# self.rnn.flatten_parameters()
packed_output, _ = self.rnn(packed_input)
padded_outputs, sorted_seqlens = pad_packed_sequence(packed_output, batch_first=self.batch_first)
# undo sort
# padded_outputs = padded_outputs[torch.LongTensor(orig_indices).cuda()]
# apply attention
# mlp_input = torch.cat([padded_outputs[i, :seqlens[i]] for i in range(len(seqlens))], dim=0)
# mlp_output = self.mlp(mlp_input)
mlp_output = self.mlp(padded_outputs)
attn_weight = mlp_output.matmul(self.context_vector)
# end = np.cumsum(seqlens)
attended_outputs = torch.stack([F.softmax(attn_weight[i, :length], dim=0).matmul(padded_outputs[i, :length]) for i, length in enumerate(sorted_seqlens)], dim=0)
# undo sort
orig_indices = [0] * indices.shape[0]
for i in range(indices.shape[0]):
orig_indices[indices[i]] = i
if self.use_gpu:
attended_outputs = attended_outputs[torch.LongTensor(orig_indices).cuda()]
else:
attended_outputs = attended_outputs[torch.LongTensor(orig_indices)]
return attended_outputs
# ------------------------------------------------------------------------------
# HAN (Hierarchical Attention Net)
# ------------------------------------------------------------------------------
class HAN(nn.Module):
def __version__(self):
return '2.1.0'
def __init__(self,
word_emb_dim=128,
rnn_hidden_dim=64,
emb_dim=128,
rnn_type='gru',
use_emb=False,
use_gpu=False):
super(HAN, self).__init__()
self.use_gpu = use_gpu
self.word_emb_dim = word_emb_dim
self.use_emb = use_emb
self.emb_dim = emb_dim
self.rnn_type = rnn_type
self.char_encoder = CharDocumentEncoder('data/search/valid_chars', use_gpu=self.use_gpu)
# character to word embedding
self.word_embedding = CharToWord(num_chars=self.char_encoder.onhotlength,
char_emb_size=16,
word_hidden_size=64,
context_vector_size=64,
dropout_p=0.2,
projection_nonlinearity=nn.Tanh,
rnn=nn.GRU,
use_gpu=use_gpu)
# word to sentence embedding
self.document_sent_emb = AttendedSeqEmbedding(input_dim=word_emb_dim,
hidden_dim=rnn_hidden_dim,
output_dim=emb_dim,
rnn_type=rnn_type,
use_gpu=use_gpu)
# sentence to document embedding
self.document_emb = AttendedSeqEmbedding(input_dim=emb_dim,
hidden_dim=rnn_hidden_dim,
output_dim=emb_dim,
rnn_type=rnn_type,
use_gpu=use_gpu)
self.ffw = nn.Sequential(
nn.Linear(self.emb_dim, 128),
nn.Tanh(),
nn.Linear(128, 64),
nn.Tanh()
)
self.empty_document_emb = nn.Parameter(torch.Tensor(emb_dim))
self.empty_document_emb.data.normal_(0, 0.1)
def embed(self, sequences, seqlens, seq_emb, empty_seq_emb):
nonempty = (seqlens != 0).nonzero()
nonempty_seq_embs = seq_emb(sequences, seqlens[nonempty])
if np.count_nonzero(seqlens != 0) < len(seqlens):
seq_embs = Variable(torch.Tensor(len(seqlens), self.emb_dim))
if self.use_gpu:
seq_embs = seq_embs.cuda()
# Pdb().set_trace()
seq_embs[nonempty] = nonempty_seq_embs
empty = (seqlens == 0).nonzero()
seq_embs[empty] = empty_seq_emb.expand(len(empty[0]), self.emb_dim)
else:
seq_embs = nonempty_seq_embs
return seq_embs
def compute_document_embs(self, documents):
# we are supposed to have a batch[ doc[ sentences[word] ] ]
stacked_encoded_words = []
sentlens = [] # sentlens is a list of list of #words in each sentence in each document
for doc in documents:
for sent in doc:
sentlens.append(len(sent) + 1) # each sentence has a last timestep for end, done in encoding step before
stacked_encoded_words = stacked_encoded_words + self.char_encoder.encode_sentence(sent)
sentlens = np.array(sentlens)
# now we have a flattened repres of all words and end of sentences
seq_tensor, sequence_lens = self.char_encoder.pad_char_seq_inds(stacked_encoded_words)
# use the first embedding group to embed onhot characters to words
words = self.word_embedding(seq_tensor, sequence_lens)
if self.use_gpu:
words = words.cuda()
document_word_embs = words
document_sent_embs = self.document_sent_emb(document_word_embs, sentlens)
# documentlens is a list of #sentences in each document
documentlens = np.array([len(document) for document in documents])
document_embs = self.embed(document_sent_embs, documentlens, self.document_emb, self.empty_document_emb)
return document_embs
def forward(self, documents):
# forward pass with mono-branch only
document_embs = self.compute_document_embs(documents)
return self.ffw(document_embs)
# ------------------------------------------------------------------------------
# SIAMESE Network
# ------------------------------------------------------------------------------
class SiameseNetwork(nn.Module):
def __init__(self, args):
super(SiameseNetwork, self).__init__()
# Store config
self.args = args
self.use_cuda = args.cuda
self.input_dim = 128
self.emb_dim = 128
# includes the hierarchical attention network (for both question & document)
self.han = HAN(
word_emb_dim=self.input_dim,
rnn_hidden_dim=64,
emb_dim=self.emb_dim,
rnn_type='gru',
use_emb=False,
use_gpu=self.use_cuda)
def forward(self, questions, documents):
# distance_input = (documents - questions).pow(2).sum(1) # squared distances
question_embed= self.han(questions)
document_embed = self.han(documents)
return question_embed, document_embed
# same as forward, but for clarity at prediction time
def get_embedding(self, questions, documents):
question_embed= self.han(questions)
document_embed = self.han(documents)
return question_embed, document_embed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment