Skip to content

Instantly share code, notes, and snippets.

@suriyadeepan
Last active May 8, 2018 16:14
Show Gist options
  • Save suriyadeepan/29e56a0ac4ec8feb559acc95840f7e66 to your computer and use it in GitHub Desktop.
Save suriyadeepan/29e56a0ac4ec8feb559acc95840f7e66 to your computer and use it in GitHub Desktop.
Sentiment Classification on Movie Reviews
import tensorflow as tf
import numpy as np
DropoutWrapper = tf.nn.rnn_cell.DropoutWrapper
class SentimentNetwork(object):
def __init__(self, hdim=25, wdim=25, pdim=25, vocab_size=2000, pos_vocab_size=30,
num_labels=5, dropout_value=0.5, lr=0.001):
tf.reset_default_graph()
# placeholders
sentences = tf.placeholder(tf.int32, [None, None], name='sentence')
pos = tf.placeholder(tf.int32, [None, None], name='pos')
labels = tf.placeholder(tf.int32, [None, ], name='label')
mode = tf.placeholder(tf.int32, (), name='mode')
self.placeholders = {
'sentence' : sentences,
'label' : labels,
'mode' : mode
}
# drop out
dropout = tf.cond(
tf.equal(mode, 0), # If
lambda : dropout_value, # True
lambda : 0. # False
)
# word embedding
wemb = tf.get_variable(shape=[vocab_size-2, wdim],
dtype=tf.float32,
initializer=tf.random_uniform_initializer(-0.01, 0.01),
name='word_embedding')
# add UNK and PAD
wemb = tf.concat([ tf.zeros([2, wdim]), wemb ], axis=0)
pemb = tf.get_variable(shape=[pos_vocab_size, pdim],
dtype=tf.float32,
initializer=tf.random_uniform_initializer(-0.01, 0.01),
name='pos_embedding')
emb_sentence = tf.concat(
[ tf.nn.embedding_lookup(wemb, sentences),
tf.nn.embedding_lookup(wemb, pos) ],
axis=-1)
"""
# define forward and backward cells for RNN
with tf.variable_scope('forward'):
cell_fw = DropoutWrapper(tf.nn.rnn_cell.LSTMCell(hdim),
output_keep_prob=1. - dropout)
state_fw = cell_fw.zero_state(batch_size_, tf.float32)
with tf.variable_scope('backward'):
cell_bw = DropoutWrapper(tf.nn.rnn_cell.LSTMCell(hdim),
output_keep_prob=1. - dropout)
state_bw = cell_bw.zero_state(batch_size_, tf.float32)
with tf.variable_scope('encoder') as scope:
# encode drug sequence
encoded_sequence, (__fsf, __fsb) = tf.nn.bidirectional_dynamic_rnn(
cell_fw, cell_bw, # forward and backward cells
inputs= tf.nn.embedding_lookup(wemb, self.sequence),
sequence_length=seqlens,
dtype=tf.float32)
"""
with tf.variable_scope('rnn_cell') as scope:
cell = DropoutWrapper(
tf.nn.rnn_cell.LSTMCell(hdim),
output_keep_prob=1. - dropout
)
with tf.variable_scope('encoder') as scope:
outputs, final_state = tf.nn.dynamic_rnn(
cell = cell,
inputs = emb_sentence,
sequence_length = tf.count_nonzero(sentences, axis=-1),
dtype=tf.float32
)
logits = tf.contrib.layers.fully_connected(final_state.c, num_labels)
self.out = {
'prob' : tf.nn.softmax(logits),
'pred' : tf.argmax(tf.nn.softmax(logits), axis=-1),
'loss' : tf.reduce_mean(
tf.nn.sparse_softmax_cross_entropy_with_logits(
logits=logits,
labels=labels
))
}
self.out['accuracy'] = tf.cast(tf.equal(
tf.cast(self.out['pred'], tf.int32),
labels), tf.float32)
self.train_op = tf.train.AdamOptimizer().minimize(self.out['loss'])
def rand_execution(netw):
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
return sess.run(netw.out, feed_dict = {
netw.placeholders['sentence'] : np.random.randint(0, 100, [8, 10]),
netw.placeholders['label'] : np.random.randint(0, 4, [8, ])
})
if __name__ == '__main__':
netw = SentimentNetwork()
print(rand_execution(netw))
from nltk import word_tokenize
from nltk import FreqDist
from nltk import pos_tag
import spacy
from tqdm import tqdm
DATA = 'data/senticorpus.tsv'
PAD = 0
nlp = spacy.load('en')
def spacy_PoS(sentence):
return [ w.pos_ for w in nlp(sentence) ]
def read_all(filename):
samples = []
with open(filename) as f:
for line in f.readlines()[1:]:
_, sent_id, sentence, sentiment = line.strip().split('\t')
samples.append((sentence, sentiment))
return samples
def read_sentences(filename):
sent_dict = {}
with open(filename) as f:
for line in f.readlines()[1:]:
_, sent_id, sentence, sentiment = line.strip().split('\t')
if sent_id not in sent_dict:
sent_dict[sent_id] = (sentence, sentiment)
else:
if len(sent_dict[sent_id][0]) < len(sentence):
sent_dict[sent_id] = (sentence, sentiment)
return [ tuple(v) for k,v in sent_dict.items() ]
def build_vocabulary(samples, max_vocab_size):
words = word_tokenize(' '.join([ text for text, senti in samples ]))
# print('Total number of unique tokens : ', len(set(words)))
fd = FreqDist(word_tokenize(' '.join([ text for text, senti in samples ])))
return ['PAD', 'UNK' ] + [ w for w,f in fd.most_common(max_vocab_size) ]
def build_pos_vocabulary(samples):
pos_vocab = []
for sample in tqdm(samples):
pos_vocab.extend(
[ p for p in spacy_PoS(sample[0]) ]
)
return sorted(set(pos_vocab))
def index_samples(samples, vocab, pos_vocab):
w2i = { w:i for i,w in enumerate(vocab) }
w2i_ = lambda w : w2i[w] if w in w2i else 1
p2i = { p:i for i,p in enumerate(pos_vocab) }
indexed_samples = []
for sentence, sentiment in tqdm(samples):
tokenized = [ w for w in word_tokenize(sentence) ]
# PoS tag
pos = [ p for p in spacy_PoS(tokenized) ]
indexed_samples.append(
([ w2i_(w) for w in tokenized ],
[ p2i[p] for p in spacy_PoS(sentence) ],
int(sentiment))
)
#return sorted(indexed_samples,
# key = lambda x : len(x[0]),
# reverse=True
# )
shuffle(index_samples)
return index_samples
def create_samples(max_vocab_size, consider_phrases=True):
samples = read_all(DATA) if consider_phrases else read_sentences(DATA)
vocab = build_vocabulary(samples, max_vocab_size)
return index_samples(samples, vocab, build_pos_vocabulary(samples))
import tensorflow as tf
import numpy as np
from reader import create_samples
from net import SentimentNetwork
from tqdm import tqdm
BATCH_SIZE=64
PAD=0
UNK=1
def seq_maxlen(seqs):
"""
Maximum length of max-length sequence
in a batch of sequences
Args:
seqs : list of sequences
Returns:
length of the lengthiest sequence
"""
return max([len(seq) for seq in seqs])
def pad_seq(seqs, maxlen=0, PAD=PAD, truncate=False):
# pad sequence with PAD
# if seqs is a list of lists
if type(seqs[0]) == type([]):
# get maximum length of sequence
maxlen = maxlen if maxlen else seq_maxlen(seqs)
def pad_seq_(seq):
if truncate and len(seq) > maxlen:
# truncate sequence
return seq[:maxlen]
# return padded
return seq + [PAD]*(maxlen-len(seq))
seqs = [ pad_seq_(seq) for seq in seqs ]
return seqs
def vectorize(samples):
try:
sentence = np.array(pad_seq([ s[0] for s in samples ]))
pos = np.array([ s[1] for s in samples ])
label = np.array([ s[2] for s in samples ])
except:
print('\n')
print(samples)
print('\n')
input()
return {
'sentence' : sentence,
'pos' : pos,
'label' : label
}
def train_run(netw, samples):
sess = tf.get_default_session()
samples = vectorize(samples)
return sess.run([ netw.train_op, netw.out ],
feed_dict = {
netw.placeholders['sentence'] : samples['sentence'],
netw.placeholders['label' ] : samples['label' ],
netw.placeholders['pos' ] : samples['pos' ],
netw.placeholders['mode' ] : 0
}
)[1]
def evaluate(netw, testset, eval_batch_size=30):
exec_g = lambda sample : sess.run(netw.out,
feed_dict = {
netw.placeholders['sentence'] : sample['sentence'],
netw.placeholders['label' ] : sample['label' ],
netw.placeholders['pos' ] : sample['pos' ],
netw.placeholders['mode' ] : 1
}
)
iterations = len(testset) // eval_batch_size
return np.mean(np.array(
[ exec_g(vectorize([testset[i]]))['accuracy']
for i in tqdm(range(iterations)) ]
))
def train(netw, trainset, testset, epochs=100):
iterations = len(trainset)//BATCH_SIZE
for i in range(epochs):
epoch_loss = []
for j in tqdm(range(iterations)):
out = train_run(netw, trainset[j * BATCH_SIZE : (j+1) * BATCH_SIZE])
epoch_loss.append(out['loss'])
# end of epoch
print(i, 'loss', np.mean(np.array(epoch_loss)))
print(i, 'accuracy', evaluate(netw, testset))
if __name__ == '__main__':
vocab_size = 5000
dataset = create_samples(max_vocab_size=vocab_size, consider_phrases=True)
split_ = int(0.85 * len(dataset))
trainset = sorted(dataset[:split_], key=lambda x : len(x[0]))
testset = sorted(dataset[split_:], key=lambda x : len(x[0]))
print(vectorize(testset[:10]))
"""
# create model
netw = SentimentNetwork(vocab_size=vocab_size, hdim=50, wdim=50,
dropout_value=0.5, lr=0.005)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
train(netw, trainset, testset)
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment