Last active
July 12, 2016 06:41
-
-
Save alvations/6281ddffe9c2e9a9f1dabfcd8236c854 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python -*- coding: utf-8 -*- | |
""" | |
An implementation of the *FastNet* from | |
Armand Joulin, Edouard Grave, Piotr Bojanowski and Tomas Mikolov. 2016. | |
Bag of Tricks for Efficient Text Classification. | |
https://arxiv.org/pdf/1607.01759v2.pdf | |
Largely based on RaRe Technologies' `gensim` | |
https://github.com/RaRe-Technologies/gensim/blob/develop/gensim/models/word2vec.py | |
(Disclaimer: It's dirty cut and paste code and even my understanding of the | |
paper might not be correct. I wrote this because I was curious about the speed | |
and results. Suggestions and corrections are gladly welcomed!) | |
""" | |
import heapq | |
import pickle | |
from copy import deepcopy | |
from math import sqrt | |
from collections import Counter | |
from itertools import dropwhile, chain | |
import pandas as pd | |
import numpy as np | |
from gensim.utils import tokenize | |
from gensim.models.word2vec import Vocab | |
def get_AGTrain(dir_path='ag_news_csv/', min_count=100): | |
label_names = {1:'LABELL1', 2:'LABELL2', 3:'LABELL3', 4:'LABELL4'} | |
df_train = pd.read_csv(dir_path+'train.csv', delimiter=',',header=None, | |
names=('Label', 'Title', 'Caption')) | |
X_train = df_train[['Title', 'Caption']].apply(lambda x: | |
list(tokenize(' '.join(x))), axis=1) | |
y_train = df_train['Label'].apply(lambda x: label_names[x]) | |
vocab_count = Counter(chain(*X_train)) | |
for key, count in dropwhile(lambda key_count: key_count[1] >= min_count, | |
vocab_count.most_common()): | |
del vocab_count[key] | |
X_train = X_train.apply(lambda x: [word if word in vocab_count else 'UNK' for word in x]) | |
return X_train, y_train | |
def get_AGTest(dir_path='ag_news_csv/'): | |
label_names = {1:'LABELL1', 2:'LABELL2', 3:'LABELL3', 4:'LABELL4'} | |
df_test = pd.read_csv(dir_path+'test.csv', delimiter=',',header=None, | |
names=('Label', 'Title', 'Caption')) | |
X_test = df_test[['Title', 'Caption']].apply(lambda x: | |
list(tokenize(' '.join(x))), axis=1) | |
y_test = df_test['Label'].apply(lambda x: label_names[x]) | |
X_test= X_test.apply(lambda x: [word if word in model.vocab else 'UNK' | |
for word in x]) | |
return X_test, y_test | |
def inject_label_counts(X_train, y_train): | |
num_tokens_per_label = {'LABELL1':0, 'LABELL2':0, 'LABELL3':0, 'LABELL4':0} | |
for sent, label in zip(X_train, y_train): | |
num_tokens_per_label[label] += len(sent) | |
for k,v in num_tokens_per_label.items(): | |
# no. of labels in training instance per class = 30000 | |
num_tokens_per_label[k] -= 30000 | |
return num_tokens_per_label | |
def cosine(vec1, vec2): | |
sum1 = sum(vec1**2) | |
sum2 = sum(vec2**2) | |
denominator = sqrt(sum1) * sqrt(sum2) | |
return np.dot(vec1,vec2) / denominator | |
def predict(X_test, y_test): | |
label_vectors = {'LABELL1':model['LABELL1'], 'LABELL2':model['LABELL2'], | |
'LABELL3':model['LABELL3'], 'LABELL4':model['LABELL4'],} | |
correct =0 | |
for row, gold in zip(X_test, y_test): | |
query = sum([model[word] for word in row]) | |
results = sorted([(cosine(query, v), k) | |
for k, v in label_vectors.items()], reverse=True) | |
print ('\t'.join([gold, results[0][1], str(results)])) | |
if gold == results[0][1]: | |
correct+=1 | |
print (correct / len(X_test)) | |
class Word2Vec(): | |
def __init__(self, sentences=None, embed_dim=10, hs=1, thr=0, window=1, | |
min_count=1, alpha=0.05, min_alpha=0.0001, seed=1): | |
self.vocab = {} # mapping from a word (string) to a Vocab object | |
self.index2word = [] # map from a word's matrix index (int) to the word (string) | |
self.embed_dim = embed_dim | |
self.hs = hs | |
self.thr = thr | |
self.window = window | |
self.min_count = min_count | |
self.alpha = alpha | |
self.min_alpha = min_alpha | |
self.seed = seed | |
def reset_weights(self): | |
np.random.seed(self.seed) | |
# weights | |
self.syn1 = (np.random.rand(len(self.vocab), self.embed_dim) - 0.5) / self.embed_dim | |
# embedding | |
self.syn0 =(np.random.rand(len(self.vocab), self.embed_dim) - 0.5) / self.embed_dim | |
self.syn0norm = None | |
def _create_binary_tree(self): | |
""" | |
Create a binary Huffman tree using stored vocabulary word counts. Frequent words | |
will have shorter binary codes. Called internally from `build_vocab()`. | |
""" | |
vocab_size = len(self.vocab) | |
# build the huffman tree | |
# different from the original word2vec, we only have the labels here. | |
heap = [self.vocab['LABELL1'],self.vocab['LABELL2'], | |
self.vocab['LABELL3'],self.vocab['LABELL4'] ] | |
num_labels = 4 | |
heapq.heapify(heap) | |
for i in range(num_labels - 1): | |
min1, min2 = heapq.heappop(heap), heapq.heappop(heap) | |
# Any new vocab from the joint branches have index > len(self.vocab) | |
heapq.heappush(heap, Vocab(count=min1.count + min2.count, | |
index=i + len(self.vocab), | |
left=min1, right=min2)) | |
# recurse over the tree, assigning a binary code to each vocabulary word | |
if heap: | |
max_depth, stack = 0, [(heap[0], [], [])] | |
while stack: | |
node, codes, points = stack.pop() | |
if node.index < vocab_size: | |
# leaf node => store its path from the root | |
node.code, node.point = codes, points | |
max_depth = max(len(codes), max_depth) | |
else: | |
# inner node => continue recursion | |
points = np.array(list(points) + [node.index - vocab_size], dtype=int) | |
stack.append((node.left, np.array(list(codes) + [0], dtype=int), points)) | |
stack.append((node.right, np.array(list(codes) + [1], dtype=int), points)) | |
def build_vocab(self, sentences, hs=1, neg=False, thr=0): | |
""" | |
Build vocabulary from a sequence of sentences (can be a once-only generator stream). | |
Each sentence must be a list of strings. | |
""" | |
sentence_no, vocab = -1, {} | |
total_words = 0 | |
for sentence_no, sentence in enumerate(sentences): | |
for word in sentence: | |
total_words += 1 | |
try: | |
vocab[word].count += 1 | |
except KeyError: | |
vocab[word] = Vocab(count=1) | |
for k, v in inject_label_counts(X_train, y_train).items(): | |
vocab[k] = Vocab(count=v) | |
# assign a unique index to each word | |
self.vocab, self.index2word = {}, [] | |
for word, v in vocab.items(): | |
if v.count >= self.min_count: | |
v.index = len(self.vocab) | |
self.index2word.append(word) | |
self.vocab[word] = v | |
# add probabilities for sub-sampling (if self.thr > 0) | |
if self.thr > 0: | |
total_words = float(sum(v.count for v in self.vocab.values())) | |
for word in self.vocab: | |
# formula from paper | |
#self.vocab[word].prob = max(0.,1.-sqrt(self.thr*total_words/self.vocab[word].count)) | |
# formula from code | |
self.vocab[word].prob = (sqrt(self.vocab[word].count / | |
(self.thr * total_words)) + 1.) * \ | |
(self.thr * total_words) / self.vocab[word].count | |
else: | |
# if prob is 0, word wont get discarded | |
for word in self.vocab: | |
self.vocab[word].prob = 0. | |
# add info about each word's Huffman encoding | |
self._create_binary_tree() | |
# initialize layers | |
self.reset_weights() | |
def train_sentence_cbow(self, sentence, alpha, sentence_no): | |
""" | |
Update a cbow model by training on a single sentence | |
using hierarchical softmax and/or negative sampling. | |
The sentence is a list of Vocab objects (or None, where the corresponding | |
word is not in the vocabulary. Called internally from `Word2Vec.train()`. | |
""" | |
for pos, word in enumerate(sentence): | |
if not word or (word.prob and word.prob < np.random.rand()): | |
continue | |
# Now this is the novel part where the label becomes the "center" | |
# and the current word + window becomes its context. | |
# i.e. right-only sliding window | |
end = min(len(sentence), pos+self.window) | |
word2_indices = [word2.index for word2 in sentence[pos:end]] | |
## word2_indices = [word.index] # Unigram | |
if not word2_indices: | |
# in this case the sum would return zeros, the mean nans but really no point in doing anything at all | |
continue | |
l1 = np.sum(self.syn0[word2_indices], axis=0) # 1xlayer1_size | |
if self.hs: | |
# Now our "center word" is the label of the sentence, | |
_word = self.vocab[y_train[sentence_no]] | |
# work on the entire tree at once --> 2d matrix, codelen x layer1_size | |
l2 = deepcopy(self.syn1[_word.point]) | |
# propagate hidden -> output | |
f = 1. / (1. + np.exp(-np.dot(l1, l2.T))) | |
# vector of error gradients multiplied by the learning rate | |
g = (1. - _word.code - f) * alpha | |
# learn hidden -> output | |
self.syn1[_word.point] += np.outer(g, l1) | |
# learn input -> hidden, here for all words in the window separately | |
self.syn0[word2_indices] += np.dot(g, l2) | |
return len([word for word in sentence if word]) | |
def train(self, sentences): | |
if not self.vocab: | |
raise RuntimeError("you must first build vocabulary before training the model") | |
total_words = sum(v.count for v in self.vocab.values()) | |
word_count = 0 | |
for sentence_no, sentence in enumerate(sentences): | |
# convert input string lists to Vocab objects (or None for OOV words) | |
no_oov = [self.vocab.get(word, 'UNK') for word in sentence] | |
# update the learning rate before every iteration | |
alpha = self.min_alpha + (self.alpha-self.min_alpha) * (1. - word_count / total_words) | |
# train on the sentence and check how many words did we train on | |
# (out-of-vocabulary (unknown) words do not count) | |
word_count += self.train_sentence_cbow(no_oov, alpha, sentence_no) | |
# for convenience (for later similarity computations, etc.), | |
# store all embeddings additionally as unit length vectors | |
self.syn0norm = self.syn0/np.array([np.linalg.norm(self.syn0,axis=1)]).T | |
def __getitem__(self, word): | |
return self.syn0[self.vocab[word].index] | |
# Download data from http://goo.gl/JyCnZq | |
X_train, y_train = get_AGTrain() | |
model = Word2Vec() | |
model.build_vocab(X_train) | |
# no. of epochs. | |
num_iter = 5 | |
for _ in range(num_iter): | |
model.train(X_train) | |
with open('fastnet-model.pk', 'wb') as fout: | |
pickle.dump(model, fout) | |
X_test, y_test = get_AGTest() | |
predict(X_test, y_test) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment