Skip to content

Instantly share code, notes, and snippets.

@alvations
Last active July 12, 2016 06:41
Show Gist options
  • Save alvations/6281ddffe9c2e9a9f1dabfcd8236c854 to your computer and use it in GitHub Desktop.
Save alvations/6281ddffe9c2e9a9f1dabfcd8236c854 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python -*- coding: utf-8 -*-
"""
An implementation of the *FastNet* from
Armand Joulin, Edouard Grave, Piotr Bojanowski and Tomas Mikolov. 2016.
Bag of Tricks for Efficient Text Classification.
https://arxiv.org/pdf/1607.01759v2.pdf
Largely based on RaRe Technologies' `gensim`
https://github.com/RaRe-Technologies/gensim/blob/develop/gensim/models/word2vec.py
(Disclaimer: It's dirty cut and paste code and even my understanding of the
paper might not be correct. I wrote this because I was curious about the speed
and results. Suggestions and corrections are gladly welcomed!)
"""
import heapq
import pickle
from copy import deepcopy
from math import sqrt
from collections import Counter
from itertools import dropwhile, chain
import pandas as pd
import numpy as np
from gensim.utils import tokenize
from gensim.models.word2vec import Vocab
def get_AGTrain(dir_path='ag_news_csv/', min_count=100):
label_names = {1:'LABELL1', 2:'LABELL2', 3:'LABELL3', 4:'LABELL4'}
df_train = pd.read_csv(dir_path+'train.csv', delimiter=',',header=None,
names=('Label', 'Title', 'Caption'))
X_train = df_train[['Title', 'Caption']].apply(lambda x:
list(tokenize(' '.join(x))), axis=1)
y_train = df_train['Label'].apply(lambda x: label_names[x])
vocab_count = Counter(chain(*X_train))
for key, count in dropwhile(lambda key_count: key_count[1] >= min_count,
vocab_count.most_common()):
del vocab_count[key]
X_train = X_train.apply(lambda x: [word if word in vocab_count else 'UNK' for word in x])
return X_train, y_train
def get_AGTest(dir_path='ag_news_csv/'):
label_names = {1:'LABELL1', 2:'LABELL2', 3:'LABELL3', 4:'LABELL4'}
df_test = pd.read_csv(dir_path+'test.csv', delimiter=',',header=None,
names=('Label', 'Title', 'Caption'))
X_test = df_test[['Title', 'Caption']].apply(lambda x:
list(tokenize(' '.join(x))), axis=1)
y_test = df_test['Label'].apply(lambda x: label_names[x])
X_test= X_test.apply(lambda x: [word if word in model.vocab else 'UNK'
for word in x])
return X_test, y_test
def inject_label_counts(X_train, y_train):
num_tokens_per_label = {'LABELL1':0, 'LABELL2':0, 'LABELL3':0, 'LABELL4':0}
for sent, label in zip(X_train, y_train):
num_tokens_per_label[label] += len(sent)
for k,v in num_tokens_per_label.items():
# no. of labels in training instance per class = 30000
num_tokens_per_label[k] -= 30000
return num_tokens_per_label
def cosine(vec1, vec2):
sum1 = sum(vec1**2)
sum2 = sum(vec2**2)
denominator = sqrt(sum1) * sqrt(sum2)
return np.dot(vec1,vec2) / denominator
def predict(X_test, y_test):
label_vectors = {'LABELL1':model['LABELL1'], 'LABELL2':model['LABELL2'],
'LABELL3':model['LABELL3'], 'LABELL4':model['LABELL4'],}
correct =0
for row, gold in zip(X_test, y_test):
query = sum([model[word] for word in row])
results = sorted([(cosine(query, v), k)
for k, v in label_vectors.items()], reverse=True)
print ('\t'.join([gold, results[0][1], str(results)]))
if gold == results[0][1]:
correct+=1
print (correct / len(X_test))
class Word2Vec():
def __init__(self, sentences=None, embed_dim=10, hs=1, thr=0, window=1,
min_count=1, alpha=0.05, min_alpha=0.0001, seed=1):
self.vocab = {} # mapping from a word (string) to a Vocab object
self.index2word = [] # map from a word's matrix index (int) to the word (string)
self.embed_dim = embed_dim
self.hs = hs
self.thr = thr
self.window = window
self.min_count = min_count
self.alpha = alpha
self.min_alpha = min_alpha
self.seed = seed
def reset_weights(self):
np.random.seed(self.seed)
# weights
self.syn1 = (np.random.rand(len(self.vocab), self.embed_dim) - 0.5) / self.embed_dim
# embedding
self.syn0 =(np.random.rand(len(self.vocab), self.embed_dim) - 0.5) / self.embed_dim
self.syn0norm = None
def _create_binary_tree(self):
"""
Create a binary Huffman tree using stored vocabulary word counts. Frequent words
will have shorter binary codes. Called internally from `build_vocab()`.
"""
vocab_size = len(self.vocab)
# build the huffman tree
# different from the original word2vec, we only have the labels here.
heap = [self.vocab['LABELL1'],self.vocab['LABELL2'],
self.vocab['LABELL3'],self.vocab['LABELL4'] ]
num_labels = 4
heapq.heapify(heap)
for i in range(num_labels - 1):
min1, min2 = heapq.heappop(heap), heapq.heappop(heap)
# Any new vocab from the joint branches have index > len(self.vocab)
heapq.heappush(heap, Vocab(count=min1.count + min2.count,
index=i + len(self.vocab),
left=min1, right=min2))
# recurse over the tree, assigning a binary code to each vocabulary word
if heap:
max_depth, stack = 0, [(heap[0], [], [])]
while stack:
node, codes, points = stack.pop()
if node.index < vocab_size:
# leaf node => store its path from the root
node.code, node.point = codes, points
max_depth = max(len(codes), max_depth)
else:
# inner node => continue recursion
points = np.array(list(points) + [node.index - vocab_size], dtype=int)
stack.append((node.left, np.array(list(codes) + [0], dtype=int), points))
stack.append((node.right, np.array(list(codes) + [1], dtype=int), points))
def build_vocab(self, sentences, hs=1, neg=False, thr=0):
"""
Build vocabulary from a sequence of sentences (can be a once-only generator stream).
Each sentence must be a list of strings.
"""
sentence_no, vocab = -1, {}
total_words = 0
for sentence_no, sentence in enumerate(sentences):
for word in sentence:
total_words += 1
try:
vocab[word].count += 1
except KeyError:
vocab[word] = Vocab(count=1)
for k, v in inject_label_counts(X_train, y_train).items():
vocab[k] = Vocab(count=v)
# assign a unique index to each word
self.vocab, self.index2word = {}, []
for word, v in vocab.items():
if v.count >= self.min_count:
v.index = len(self.vocab)
self.index2word.append(word)
self.vocab[word] = v
# add probabilities for sub-sampling (if self.thr > 0)
if self.thr > 0:
total_words = float(sum(v.count for v in self.vocab.values()))
for word in self.vocab:
# formula from paper
#self.vocab[word].prob = max(0.,1.-sqrt(self.thr*total_words/self.vocab[word].count))
# formula from code
self.vocab[word].prob = (sqrt(self.vocab[word].count /
(self.thr * total_words)) + 1.) * \
(self.thr * total_words) / self.vocab[word].count
else:
# if prob is 0, word wont get discarded
for word in self.vocab:
self.vocab[word].prob = 0.
# add info about each word's Huffman encoding
self._create_binary_tree()
# initialize layers
self.reset_weights()
def train_sentence_cbow(self, sentence, alpha, sentence_no):
"""
Update a cbow model by training on a single sentence
using hierarchical softmax and/or negative sampling.
The sentence is a list of Vocab objects (or None, where the corresponding
word is not in the vocabulary. Called internally from `Word2Vec.train()`.
"""
for pos, word in enumerate(sentence):
if not word or (word.prob and word.prob < np.random.rand()):
continue
# Now this is the novel part where the label becomes the "center"
# and the current word + window becomes its context.
# i.e. right-only sliding window
end = min(len(sentence), pos+self.window)
word2_indices = [word2.index for word2 in sentence[pos:end]]
## word2_indices = [word.index] # Unigram
if not word2_indices:
# in this case the sum would return zeros, the mean nans but really no point in doing anything at all
continue
l1 = np.sum(self.syn0[word2_indices], axis=0) # 1xlayer1_size
if self.hs:
# Now our "center word" is the label of the sentence,
_word = self.vocab[y_train[sentence_no]]
# work on the entire tree at once --> 2d matrix, codelen x layer1_size
l2 = deepcopy(self.syn1[_word.point])
# propagate hidden -> output
f = 1. / (1. + np.exp(-np.dot(l1, l2.T)))
# vector of error gradients multiplied by the learning rate
g = (1. - _word.code - f) * alpha
# learn hidden -> output
self.syn1[_word.point] += np.outer(g, l1)
# learn input -> hidden, here for all words in the window separately
self.syn0[word2_indices] += np.dot(g, l2)
return len([word for word in sentence if word])
def train(self, sentences):
if not self.vocab:
raise RuntimeError("you must first build vocabulary before training the model")
total_words = sum(v.count for v in self.vocab.values())
word_count = 0
for sentence_no, sentence in enumerate(sentences):
# convert input string lists to Vocab objects (or None for OOV words)
no_oov = [self.vocab.get(word, 'UNK') for word in sentence]
# update the learning rate before every iteration
alpha = self.min_alpha + (self.alpha-self.min_alpha) * (1. - word_count / total_words)
# train on the sentence and check how many words did we train on
# (out-of-vocabulary (unknown) words do not count)
word_count += self.train_sentence_cbow(no_oov, alpha, sentence_no)
# for convenience (for later similarity computations, etc.),
# store all embeddings additionally as unit length vectors
self.syn0norm = self.syn0/np.array([np.linalg.norm(self.syn0,axis=1)]).T
def __getitem__(self, word):
return self.syn0[self.vocab[word].index]
# Download data from http://goo.gl/JyCnZq
X_train, y_train = get_AGTrain()
model = Word2Vec()
model.build_vocab(X_train)
# no. of epochs.
num_iter = 5
for _ in range(num_iter):
model.train(X_train)
with open('fastnet-model.pk', 'wb') as fout:
pickle.dump(model, fout)
X_test, y_test = get_AGTest()
predict(X_test, y_test)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment