Created
January 19, 2015 07:31
-
-
Save andreasgrv/d30b74fbf9d6388008a3 to your computer and use it in GitHub Desktop.
RobinCould
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" module for dataset stuff """ | |
import pandas as pd | |
def text_counts(filename, label_col): | |
"""Get counts for how many texts there are for each type of author | |
:labels: The different classes the data belongs to | |
:returns: list of numbers of counts | |
""" | |
label_data = pd.read_csv(filename, encoding='utf-8', | |
sep='\t', usecols=[label_col], header=False, | |
names=['labels'])['labels'] | |
labels = set(label_data) | |
return {label:sum([d == label for d in label_data]) for label in labels} | |
def author_counts(filename, profile_col, label_col): | |
"""Get counts for how many instances of each type of author there are | |
:labels: The different classes the data belongs to | |
:returns: list of numbers of counts | |
""" | |
label_data = pd.read_csv(filename, encoding='utf-8', | |
sep='\t', usecols=[profile_col, label_col], header=False, | |
names=['profile', 'labels']) | |
unique = label_data.drop_duplicates(subset='profile')['labels'] | |
labels = set(unique) | |
return {label:sum([d == label for d in unique]) for label in labels} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Module containing feature generators used for learning. | |
I think I reinvented sklearn pipelines - too late now! | |
A dictionary of functions is used for feature generation. | |
If a function has only one argument feature generation is | |
independent of training or test case. | |
If it takes two arguments, feature generation depends | |
on case - for example: bag_of_words | |
This is supposed to be extensible as you can add or remove | |
any functions you like from the dictionary | |
""" | |
import regex as re | |
import string | |
import nltk | |
import inspect | |
import enchant | |
from textblob import TextBlob | |
from scipy.sparse import vstack, hstack | |
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer | |
# ------------------------ feature generators --------------------------------# | |
# ------------------ for heavy weaponry see bottom ---------------------------# | |
def count_hash(data): | |
"""Counts number of hash occurences | |
:data: the list of texts or a single text to count from | |
:returns: the number of hash tags | |
: If called with a list, expect a list | |
""" | |
pat = re.compile(r'(?<=\s+)#\w+', re.UNICODE) | |
if isinstance(data, list): | |
return vstack([len(pat.findall(each)) for each in data]) | |
else: | |
return len(pat.findall(data)) | |
def count_reply(data): | |
"""Counts number of reply tag occurences | |
:data: the list of texts or a single text to count from | |
:returns: the number of reply tags | |
: If called with a list, expect a list | |
""" | |
pat = re.compile(r'(?<=\s+)@\w+', re.UNICODE) | |
if isinstance(data, list): | |
return vstack([len(pat.findall(each)) for each in data]) | |
else: | |
return len(pat.findall(data)) | |
def count_url_links(data): | |
"""Counts number of links in text | |
:data: the list of texts or a single text to count from | |
:returns: the number of url links in the text | |
: If called with a list, expect a list | |
""" | |
pat = re.compile(r'((https?|ftp)://[^\s/$.?#].[^\s]*)') | |
if isinstance(data, list): | |
return vstack([len(pat.findall(each)) for each in data]) | |
else: | |
return len(pat.findall(data)) | |
def count_money(data): | |
"""Counts number of money like strings | |
:data: the list of texts or a single text to count from | |
:returns: the number of money like strings | |
: If called with a list, expect a list | |
""" | |
pat = re.compile(r'\$\d+[,.]?\d*') | |
if isinstance(data, list): | |
return vstack([len(pat.findall(each)) for each in data]) | |
else: | |
return len(pat.findall(data)) | |
def count_caps(data): | |
"""Counts capital letters in text | |
:data: the list of texts or a single text to count from | |
:returns: number of capital letters | |
: If called with a list, expect a list | |
""" | |
if isinstance(data, list): | |
return vstack([sum(c.isupper() for c in each) for each in data]) | |
else: | |
return sum(c.isupper() for c in data) | |
def count_word_caps(data): | |
"""Counts capital words in text | |
:data: the list of texts or a single text to count from | |
:returns: number of capital words | |
: If called with a list, expect a list | |
""" | |
if isinstance(data, list): | |
return vstack([sum(w.isupper() for w in nltk.word_tokenize(each)) | |
for each in data]) | |
else: | |
return sum(w.isupper() for w in nltk.word_tokenize(data)) | |
def count_word_lower(data): | |
"""Counts lowercase words in text | |
:data: the list of texts or a single text to count from | |
:returns: number of lowercase words | |
: If called with a list, expect a list | |
""" | |
if isinstance(data, list): | |
return vstack([sum(w.islower() for w in nltk.word_tokenize(each)) | |
for each in data]) | |
else: | |
return sum(w.islower() for w in nltk.word_tokenize(data)) | |
def count_word_title(data): | |
"""Counts titlecase words in text | |
:data: the list of texts or a single text to count from | |
:returns: number of titlecase words | |
: If called with a list, expect a list | |
""" | |
if isinstance(data, list): | |
return vstack([sum(w.istitle() for w in nltk.word_tokenize(each)) | |
for each in data]) | |
else: | |
return sum(w.istitle() for w in nltk.word_tokenize(data)) | |
def count_punct(data): | |
"""Counts punctuation in text | |
:data: the list of texts or a single text to count from | |
:returns: number of punctuation characters | |
: If called with a list, expect a list | |
""" | |
if isinstance(data, list): | |
return vstack([sum(c in string.punctuation for c in each) | |
for each in data]) | |
else: | |
return sum(c in string.punctuation for c in data) | |
def count_word_length(data): | |
"""Counts word length distribution in text | |
:data: the list of texts or a single text to count from | |
:returns: A list of frequencies words with 1-19 letters | |
: If called with a list, expect an array | |
""" | |
if isinstance(data, list): | |
return vstack([hstack([sum(len(w) == l | |
for w in nltk.word_tokenize(each)) | |
for l in range(1, 20)]) | |
for each in data]) | |
else: | |
return [sum(len(w) == l for w in nltk.word_tokenize(data)) | |
for l in range(1, 20)] | |
def get_polarity(data): | |
""" Returns the sentiment polarity of a text | |
:data: the list of texts or a single text to count from | |
:returns: the sentiment polarity as a number between -1 and 1 | |
: If called with a list, expect an array | |
""" | |
if isinstance(data, list): | |
return vstack([hstack(TextBlob(each).sentiment[:]) for each in data]) | |
else: | |
return TextBlob(data).sentiment.polarity | |
def count_valid_words(data): | |
""" Returns a count of words found when looked up in a dictionary | |
:data: the list of texts or a single text to count from | |
:returns: how many valid words where found | |
: If called with a list, expect a list | |
""" | |
dic = enchant.Dict('en_US') | |
if isinstance(data, list): | |
return vstack([sum(dic.check(word.lower()) | |
for word in nltk.word_tokenize(each)) | |
for each in data]) | |
else: | |
return sum(dic.check(word.lower()) for word in nltk.word_tokenize(each)) | |
def count_named_entities(data): | |
""" Counts named entities present in text | |
:data: the list of texts or a single text to count from | |
:returns: number of named entities | |
""" | |
if isinstance(data, list): | |
return vstack([sum(node.label() == 'NE' | |
for sent in nltk.sent_tokenize(each) | |
for node in nltk.ne_chunk(nltk.pos_tag(nltk.word_tokenize(sent)), | |
binary=True).subtrees()) for each in data]) | |
else: | |
# this is ugly due to the tree structure - it just counts noun entities | |
return sum(node.label() == 'NE' for sent in nltk.sent_tokenize(data) | |
for node in nltk.ne_chunk(nltk.pos_tag(nltk.word_tokenize(sent)), | |
binary=True).subtrees()) | |
def bag_of_words(train, test): | |
""" Creates a set of words found in your texts and stores counts | |
of each and every one of them | |
:train: A list of training texts | |
:test: A list of test texts | |
:returns: An array of counts of the words in the set for each text | |
""" | |
if isinstance(train, list) and isinstance(test, list): | |
vec = CountVectorizer(max_df=1, stop_words='english', | |
strip_accents='unicode') | |
train_feat = vec.fit_transform(train) | |
test_feat = vec.transform(test) | |
return train_feat, test_feat | |
else: | |
raise TypeError('data must be a list of texts for the bag_of_words') | |
def bag_of_punct(train, test): | |
""" Creates a set of characters found in your texts and stores counts | |
of each and every one of them | |
:train: A list of training texts | |
:test: A list of test texts | |
:returns: An array of counts of the characters in the set for each text | |
""" | |
if isinstance(train, list) and isinstance(test, list): | |
vec = CountVectorizer(analyzer='char') | |
train_feat = vec.fit_transform(train) | |
test_feat = vec.transform(test) | |
return train_feat, test_feat | |
else: | |
raise TypeError('data must be a list of texts for the bag_of_words') | |
def tf_idf(train, test): | |
""" Get tf-idf features for train and test data | |
:train: A list of training texts | |
:test: A list of test texts | |
:returns: An array of frequencies of the words in the set for each text | |
""" | |
if isinstance(train, list) and isinstance(test, list): | |
vec = TfidfVectorizer(min_df=1, analyzer='word', | |
strip_accents='unicode') | |
train_feat = vec.fit_transform(train) | |
test_feat = vec.transform(test) | |
return train_feat, test_feat | |
else: | |
raise TypeError('data must be a list of texts for the bag_of_words') | |
def tf_idf_punct(train, test): | |
""" Get tf-idf features for train and test data | |
:train: A list of training texts | |
:test: A list of test texts | |
:returns: An array of frequencies of the characters in the set for each text | |
""" | |
if isinstance(train, list) and isinstance(test, list): | |
vec = TfidfVectorizer(vocabulary=[c for c in string.punctuation]) | |
train_feat = vec.fit_transform(train) | |
test_feat = vec.transform(test) | |
return train_feat, test_feat | |
else: | |
raise TypeError('data must be a list of texts for the bag_of_words') | |
def tf_idf_smileys(train, test): | |
""" Get tf-idf features for train and test data | |
:train: A list of training texts | |
:test: A list of test texts | |
:returns: An array of frequencies of the smileys in the set for each text | |
""" | |
smileys = """:) :( :S :D =D :-) :-(""" | |
if isinstance(train, list) and isinstance(test, list): | |
vec = TfidfVectorizer(vocabulary=smileys.split()) | |
train_feat = vec.fit_transform(train) | |
test_feat = vec.transform(test) | |
return train_feat, test_feat | |
else: | |
raise TypeError('data must be a list of texts for the bag_of_words') | |
def tf_idf_hash(train, test): | |
""" Get tf-idf features for train and test data | |
:train: A list of training texts | |
:test: A list of test texts | |
:returns: An array of frequencies of the hashtags in the set for each text | |
""" | |
if isinstance(train, list) and isinstance(test, list): | |
vec = TfidfVectorizer(min_df=1, analyzer='word', token_pattern=r'#\w+') | |
train_feat = vec.fit_transform(train) | |
test_feat = vec.transform(test) | |
return train_feat, test_feat | |
else: | |
raise TypeError('data must be a list of texts for the bag_of_words') | |
# The features you want to use, feel free to add methods | |
features = { | |
'hash':count_hash, | |
'replies':count_reply, | |
'links':count_url_links, | |
# 'money':count_money, | |
# 'caps':count_caps, | |
# 'valid_words':count_valid_words, | |
# 'word_caps':count_word_caps, | |
# 'word_lower':count_word_lower, | |
# 'word_title':count_word_title, | |
# 'punct':count_punct, | |
# 'get_polarity':get_polarity, | |
# 'ner':count_named_entities, | |
# 'count_word_length':count_word_length, | |
# 'bag_of_punct':bag_of_punct, | |
# 'tf_idf_punct':tf_idf_punct, | |
'tf_idf_smileys':tf_idf_smileys, | |
'tf_idf_hash':tf_idf_hash, | |
'tf_idf':tf_idf | |
# 'bag_of_words':bag_of_words | |
} | |
# Use this feature set for SVC_base_model | |
#features = { | |
# 'bag_of_words':bag_of_words | |
# } | |
def __num_args(func): | |
""" Get the number of arguments of a function | |
:func: The function to inspect the arguments | |
:returns: The number of arguments func has | |
""" | |
return len(inspect.getargspec(func)[0]) | |
def featurize(train, test): | |
"""Creates rows of features for a given list of data | |
:text: the input texts to create features for | |
:returns: a list of lists of feature values for the input texts | |
""" | |
assert type(train) == type(test) | |
if isinstance(train, list) and isinstance(test, list): | |
feats = zip(*[(func(train), func(test)) if __num_args(func) == 1 | |
else func(train, test) | |
for func in features.values()]) | |
return hstack(feats[0]), hstack(feats[1]) | |
else: | |
return [(func(train), func(test)) for func in features.values()] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" The main method - the backbone """ | |
import pandas as pd | |
from sklearn import svm | |
#from sklearn import linear_model | |
#from sklearn.ensemble import RandomForestClassifier | |
#from sklearn.linear_model import SGDClassifier | |
from sklearn.metrics import accuracy_score | |
from sklearn.metrics import confusion_matrix | |
from sklearn.metrics import precision_recall_fscore_support as prfs | |
from features import featurize, features | |
from metrics import maac | |
from visualize import plot_accuracies, plot_confusion, plot_prfs | |
# read training data | |
train_cols = pd.read_csv('training/tweets.data', encoding='utf-8', sep='\t', | |
usecols=[1, 3, 4, 5], header=False, | |
names=['profile', 'texts', 'label_names', 'labels']) | |
# concatenate the texts together for each user profile in training | |
train_cols = train_cols.groupby(['profile', 'labels', 'label_names'], | |
as_index=False).sum() | |
# read test data | |
test_cols = pd.read_csv('test/tweets.data', encoding='utf-8', sep='\t', | |
usecols=[1, 3, 4, 5], header=False, | |
names=['profile', 'texts', 'label_names', 'labels']) | |
# concatenate the texts together for each user profile in tests | |
test_cols = test_cols.groupby(['profile', 'labels', 'label_names'], | |
as_index=False).sum() | |
# create labels | |
Y = train_cols['labels'].tolist() | |
Y_test = test_cols['labels'].tolist() | |
# keep all labels together sorted in a list of tuple : number , class_name | |
all_labels = sorted(list(set(zip(Y + Y_test, | |
train_cols['label_names'].tolist() | |
+ test_cols['label_names'].tolist() | |
) | |
) | |
), key=lambda x: x[0]) | |
# get labels - class value, names - class names | |
labels, names = zip(*all_labels) | |
# create features | |
X, X_test = featurize(train_cols['texts'].tolist(), | |
test_cols['texts'].tolist()) | |
print 'using features :\n' | |
print ' , '.join(features.keys()) | |
print 'Training set size {} , features number {}'.format(*X.shape) | |
print 'Test set size {} , features number {}\n\n'.format(*X_test.shape) | |
model = svm.LinearSVC(dual=False, tol=0.0001, class_weight='auto') | |
#model = RandomForestClassifier(n_estimators=10, max_depth=None) | |
#model = SGDClassifier(loss="hinge", penalty="l2", class_weight='auto') | |
model.fit(X, Y) | |
predict = model.predict(X_test) | |
# naive baseline params (most frequent in training is 0 - undecidable) | |
#acc = accuracy_score(Y_test, [0]*len(Y_test)) | |
#conf = confusion_matrix(Y_test, [0]*len(Y_test), labels=labels) | |
#prfs_data = prfs(Y_test, [0]*len(Y_test), average=None, labels=labels) | |
#norm params | |
acc = accuracy_score(Y_test, predict) | |
conf = confusion_matrix(Y_test, predict, labels=labels) | |
prfs_data = prfs(Y_test, predict, average=None, labels=labels) | |
macro = maac(conf) | |
# plot out results of run | |
print 'Accuracy : {0:.4f}'.format(acc) | |
print 'Macro Averaged Accuracy : {0:.4f}'.format(macro) | |
plot_accuracies(acc, macro, 'average metrics for classes', 'metric values %') | |
plot_prfs(prfs_data, names, 'metrics for classes', 'metric values %') | |
plot_confusion(conf, names, '', '') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" module with additional metrics for classification results """ | |
def maac(conf_mat): | |
""" Derive the Macro Average Accuracy metric | |
:confusion_matrix: The confusion matrix of the classification | |
:returns: The Macro Average Accuracy | |
""" | |
x, y = conf_mat.shape | |
assert x == y | |
accuracies = [] | |
for each in range(x): | |
total_correct = sum(conf_mat[each, :]) | |
tp = conf_mat[each, each] | |
if total_correct > 0: | |
acc = float(tp)/total_correct | |
else: | |
acc = 0 | |
accuracies.append(acc) | |
return sum(accuracies)/x |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Module to visualize the data """ | |
import numpy as np | |
import matplotlib.pyplot as plt | |
def plot_class_dist(train_dist, test_dist, xlabel, ylabel): | |
""" Plot the class distribution | |
:train_dist: A list - the counts of classes in test | |
:test_dist: A list - the counts of classes in test | |
:xlabel: The label you want printed under the x axis | |
:ylabel: The label you want printed next to the y axis | |
:returns: Nothing!!! it just plots. | |
""" | |
total_classes = list(set(train_dist.keys() + test_dist.keys())) | |
train_values = [train_dist[label] if label in train_dist.keys() else 0 | |
for label in total_classes] | |
test_values = [test_dist[label] if label in test_dist.keys() else 0 | |
for label in total_classes] | |
N = len(total_classes) | |
ind = np.arange(N) | |
width = 0.3 | |
fig, ax = plt.subplots() | |
train_rects = ax.bar(ind, train_values, width, color='#1E79D4') | |
test_rects = ax.bar(ind+width, test_values, width, color='#D4791E') | |
ax.set_ylabel(ylabel) | |
ax.set_xlabel(xlabel) | |
ax.set_xticks(ind+width) | |
ax.set_xticklabels(total_classes) | |
ax.legend((train_rects[0], test_rects[0]), ('train', 'test')) | |
ax.set_autoscaley_on(True) | |
plt.xlim([min(ind) - width, max(ind+1) + width]) | |
def autolabel(rects): | |
""" attach some text labels """ | |
for rect in rects: | |
height = rect.get_height() | |
ax.text(rect.get_x()+rect.get_width()/2., height + 0.1, | |
'%d'%int(height), ha='center', va='bottom') | |
autolabel(train_rects) | |
autolabel(test_rects) | |
plt.show() | |
def plot_prfs(prfs_data, labels, xlabel, ylabel): | |
""" Plot a prfs tuple | |
:prfs_data: A tuple of four lists with measurements | |
:labels: The labels you want for the classes | |
:xlabel: The label you want printed under the x axis | |
:ylabel: The label you want printed next to the y axis | |
:returns: Nothing!!! it just plots. | |
""" | |
total_classes = labels | |
precision, recall, fscore, support = prfs_data | |
N = len(total_classes) | |
ind = np.arange(N) | |
width = 0.22 | |
fig, ax = plt.subplots() | |
prec_rects = ax.bar(ind, (precision*100), width, color='#1E79D4') | |
recall_rects = ax.bar(ind+width, (recall*100), width, color='#D4791E') | |
fscore_rects = ax.bar(ind+2*width, (fscore*100), width, color='#D41E79') | |
supp_rects = ax.bar(ind+3*width, (support*100.0/support.sum()), width, color='#1ED479') | |
ax.set_ylabel(ylabel) | |
ax.set_xlabel(xlabel) | |
ax.set_xticks(ind+width) | |
ax.set_xticklabels(total_classes) | |
ax.legend((prec_rects[0], recall_rects[0], fscore_rects[0], supp_rects[0]), | |
('precision', 'recall', 'fscore', 'support')) | |
ax.set_autoscaley_on(True) | |
plt.xlim([min(ind) - width, max(ind+1) + width]) | |
def autolabel(rects): | |
""" attach some text labels """ | |
for rect in rects: | |
height = rect.get_height() | |
ax.text(rect.get_x()+rect.get_width()/2., height + 0.1, | |
'{0:.1f}'.format(float(height)), ha='center', va='bottom') | |
autolabel(prec_rects) | |
autolabel(recall_rects) | |
autolabel(fscore_rects) | |
autolabel(supp_rects) | |
plt.show() | |
def plot_accuracies(accuracy, maac, xlabel, ylabel): | |
""" Plot the accuracies | |
:accuracy: The accuracy | |
:maac: The macro average accuracy | |
:xlabel: The label you want printed under the x axis | |
:ylabel: The label you want printed next to the y axis | |
:returns: Nothing!!! it just plots. | |
""" | |
width = 0.4 | |
fig, ax = plt.subplots() | |
acc_rect = ax.bar(1 - width, [accuracy*100], width, color='#1E79D4') | |
maac_rect = ax.bar(1, [maac*100], width, color='#D4791E') | |
ax.set_ylabel(ylabel) | |
ax.set_xlabel(xlabel) | |
ax.legend((acc_rect[0], maac_rect[0]), | |
('accuracy', 'macro averaged accuracy')) | |
ax.set_autoscaley_on(True) | |
plt.xlim([0, 2]) | |
plt.ylim([0, 100]) | |
plt.xticks([]) | |
def autolabel(rects): | |
""" attach some text labels """ | |
for rect in rects: | |
height = rect.get_height() | |
ax.text(rect.get_x()+rect.get_width()/2., height + 0.1, | |
'{0:.1f}'.format(float(height)), ha='center', va='bottom') | |
autolabel(acc_rect) | |
autolabel(maac_rect) | |
plt.show() | |
def plot_confusion(confusion_matrix, labels, xlabel, ylabel): | |
""" Plot the confusion matrix | |
:confusion_matrix: The confusion matrix as an np array | |
:xlabel: The label you want printed under the x axis | |
:ylabel: The label you want printed next to the y axis | |
:returns: Nothing!!! it just plots. | |
""" | |
plt.matshow(confusion_matrix) | |
plt.colorbar() | |
plt.xlabel(xlabel) | |
plt.ylabel(ylabel) | |
plt.xticks(np.arange(len(labels)), tuple(labels), rotation='vertical') | |
plt.yticks(np.arange(len(labels)), tuple(labels)) | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment