Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
RobinCould
""" module for dataset stuff """
import pandas as pd
def text_counts(filename, label_col):
"""Get counts for how many texts there are for each type of author
:labels: The different classes the data belongs to
:returns: list of numbers of counts
"""
label_data = pd.read_csv(filename, encoding='utf-8',
sep='\t', usecols=[label_col], header=False,
names=['labels'])['labels']
labels = set(label_data)
return {label:sum([d == label for d in label_data]) for label in labels}
def author_counts(filename, profile_col, label_col):
"""Get counts for how many instances of each type of author there are
:labels: The different classes the data belongs to
:returns: list of numbers of counts
"""
label_data = pd.read_csv(filename, encoding='utf-8',
sep='\t', usecols=[profile_col, label_col], header=False,
names=['profile', 'labels'])
unique = label_data.drop_duplicates(subset='profile')['labels']
labels = set(unique)
return {label:sum([d == label for d in unique]) for label in labels}
""" Module containing feature generators used for learning.
I think I reinvented sklearn pipelines - too late now!
A dictionary of functions is used for feature generation.
If a function has only one argument feature generation is
independent of training or test case.
If it takes two arguments, feature generation depends
on case - for example: bag_of_words
This is supposed to be extensible as you can add or remove
any functions you like from the dictionary
"""
import regex as re
import string
import nltk
import inspect
import enchant
from textblob import TextBlob
from scipy.sparse import vstack, hstack
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
# ------------------------ feature generators --------------------------------#
# ------------------ for heavy weaponry see bottom ---------------------------#
def count_hash(data):
"""Counts number of hash occurences
:data: the list of texts or a single text to count from
:returns: the number of hash tags
: If called with a list, expect a list
"""
pat = re.compile(r'(?<=\s+)#\w+', re.UNICODE)
if isinstance(data, list):
return vstack([len(pat.findall(each)) for each in data])
else:
return len(pat.findall(data))
def count_reply(data):
"""Counts number of reply tag occurences
:data: the list of texts or a single text to count from
:returns: the number of reply tags
: If called with a list, expect a list
"""
pat = re.compile(r'(?<=\s+)@\w+', re.UNICODE)
if isinstance(data, list):
return vstack([len(pat.findall(each)) for each in data])
else:
return len(pat.findall(data))
def count_url_links(data):
"""Counts number of links in text
:data: the list of texts or a single text to count from
:returns: the number of url links in the text
: If called with a list, expect a list
"""
pat = re.compile(r'((https?|ftp)://[^\s/$.?#].[^\s]*)')
if isinstance(data, list):
return vstack([len(pat.findall(each)) for each in data])
else:
return len(pat.findall(data))
def count_money(data):
"""Counts number of money like strings
:data: the list of texts or a single text to count from
:returns: the number of money like strings
: If called with a list, expect a list
"""
pat = re.compile(r'\$\d+[,.]?\d*')
if isinstance(data, list):
return vstack([len(pat.findall(each)) for each in data])
else:
return len(pat.findall(data))
def count_caps(data):
"""Counts capital letters in text
:data: the list of texts or a single text to count from
:returns: number of capital letters
: If called with a list, expect a list
"""
if isinstance(data, list):
return vstack([sum(c.isupper() for c in each) for each in data])
else:
return sum(c.isupper() for c in data)
def count_word_caps(data):
"""Counts capital words in text
:data: the list of texts or a single text to count from
:returns: number of capital words
: If called with a list, expect a list
"""
if isinstance(data, list):
return vstack([sum(w.isupper() for w in nltk.word_tokenize(each))
for each in data])
else:
return sum(w.isupper() for w in nltk.word_tokenize(data))
def count_word_lower(data):
"""Counts lowercase words in text
:data: the list of texts or a single text to count from
:returns: number of lowercase words
: If called with a list, expect a list
"""
if isinstance(data, list):
return vstack([sum(w.islower() for w in nltk.word_tokenize(each))
for each in data])
else:
return sum(w.islower() for w in nltk.word_tokenize(data))
def count_word_title(data):
"""Counts titlecase words in text
:data: the list of texts or a single text to count from
:returns: number of titlecase words
: If called with a list, expect a list
"""
if isinstance(data, list):
return vstack([sum(w.istitle() for w in nltk.word_tokenize(each))
for each in data])
else:
return sum(w.istitle() for w in nltk.word_tokenize(data))
def count_punct(data):
"""Counts punctuation in text
:data: the list of texts or a single text to count from
:returns: number of punctuation characters
: If called with a list, expect a list
"""
if isinstance(data, list):
return vstack([sum(c in string.punctuation for c in each)
for each in data])
else:
return sum(c in string.punctuation for c in data)
def count_word_length(data):
"""Counts word length distribution in text
:data: the list of texts or a single text to count from
:returns: A list of frequencies words with 1-19 letters
: If called with a list, expect an array
"""
if isinstance(data, list):
return vstack([hstack([sum(len(w) == l
for w in nltk.word_tokenize(each))
for l in range(1, 20)])
for each in data])
else:
return [sum(len(w) == l for w in nltk.word_tokenize(data))
for l in range(1, 20)]
def get_polarity(data):
""" Returns the sentiment polarity of a text
:data: the list of texts or a single text to count from
:returns: the sentiment polarity as a number between -1 and 1
: If called with a list, expect an array
"""
if isinstance(data, list):
return vstack([hstack(TextBlob(each).sentiment[:]) for each in data])
else:
return TextBlob(data).sentiment.polarity
def count_valid_words(data):
""" Returns a count of words found when looked up in a dictionary
:data: the list of texts or a single text to count from
:returns: how many valid words where found
: If called with a list, expect a list
"""
dic = enchant.Dict('en_US')
if isinstance(data, list):
return vstack([sum(dic.check(word.lower())
for word in nltk.word_tokenize(each))
for each in data])
else:
return sum(dic.check(word.lower()) for word in nltk.word_tokenize(each))
def count_named_entities(data):
""" Counts named entities present in text
:data: the list of texts or a single text to count from
:returns: number of named entities
"""
if isinstance(data, list):
return vstack([sum(node.label() == 'NE'
for sent in nltk.sent_tokenize(each)
for node in nltk.ne_chunk(nltk.pos_tag(nltk.word_tokenize(sent)),
binary=True).subtrees()) for each in data])
else:
# this is ugly due to the tree structure - it just counts noun entities
return sum(node.label() == 'NE' for sent in nltk.sent_tokenize(data)
for node in nltk.ne_chunk(nltk.pos_tag(nltk.word_tokenize(sent)),
binary=True).subtrees())
def bag_of_words(train, test):
""" Creates a set of words found in your texts and stores counts
of each and every one of them
:train: A list of training texts
:test: A list of test texts
:returns: An array of counts of the words in the set for each text
"""
if isinstance(train, list) and isinstance(test, list):
vec = CountVectorizer(max_df=1, stop_words='english',
strip_accents='unicode')
train_feat = vec.fit_transform(train)
test_feat = vec.transform(test)
return train_feat, test_feat
else:
raise TypeError('data must be a list of texts for the bag_of_words')
def bag_of_punct(train, test):
""" Creates a set of characters found in your texts and stores counts
of each and every one of them
:train: A list of training texts
:test: A list of test texts
:returns: An array of counts of the characters in the set for each text
"""
if isinstance(train, list) and isinstance(test, list):
vec = CountVectorizer(analyzer='char')
train_feat = vec.fit_transform(train)
test_feat = vec.transform(test)
return train_feat, test_feat
else:
raise TypeError('data must be a list of texts for the bag_of_words')
def tf_idf(train, test):
""" Get tf-idf features for train and test data
:train: A list of training texts
:test: A list of test texts
:returns: An array of frequencies of the words in the set for each text
"""
if isinstance(train, list) and isinstance(test, list):
vec = TfidfVectorizer(min_df=1, analyzer='word',
strip_accents='unicode')
train_feat = vec.fit_transform(train)
test_feat = vec.transform(test)
return train_feat, test_feat
else:
raise TypeError('data must be a list of texts for the bag_of_words')
def tf_idf_punct(train, test):
""" Get tf-idf features for train and test data
:train: A list of training texts
:test: A list of test texts
:returns: An array of frequencies of the characters in the set for each text
"""
if isinstance(train, list) and isinstance(test, list):
vec = TfidfVectorizer(vocabulary=[c for c in string.punctuation])
train_feat = vec.fit_transform(train)
test_feat = vec.transform(test)
return train_feat, test_feat
else:
raise TypeError('data must be a list of texts for the bag_of_words')
def tf_idf_smileys(train, test):
""" Get tf-idf features for train and test data
:train: A list of training texts
:test: A list of test texts
:returns: An array of frequencies of the smileys in the set for each text
"""
smileys = """:) :( :S :D =D :-) :-("""
if isinstance(train, list) and isinstance(test, list):
vec = TfidfVectorizer(vocabulary=smileys.split())
train_feat = vec.fit_transform(train)
test_feat = vec.transform(test)
return train_feat, test_feat
else:
raise TypeError('data must be a list of texts for the bag_of_words')
def tf_idf_hash(train, test):
""" Get tf-idf features for train and test data
:train: A list of training texts
:test: A list of test texts
:returns: An array of frequencies of the hashtags in the set for each text
"""
if isinstance(train, list) and isinstance(test, list):
vec = TfidfVectorizer(min_df=1, analyzer='word', token_pattern=r'#\w+')
train_feat = vec.fit_transform(train)
test_feat = vec.transform(test)
return train_feat, test_feat
else:
raise TypeError('data must be a list of texts for the bag_of_words')
# The features you want to use, feel free to add methods
features = {
'hash':count_hash,
'replies':count_reply,
'links':count_url_links,
# 'money':count_money,
# 'caps':count_caps,
# 'valid_words':count_valid_words,
# 'word_caps':count_word_caps,
# 'word_lower':count_word_lower,
# 'word_title':count_word_title,
# 'punct':count_punct,
# 'get_polarity':get_polarity,
# 'ner':count_named_entities,
# 'count_word_length':count_word_length,
# 'bag_of_punct':bag_of_punct,
# 'tf_idf_punct':tf_idf_punct,
'tf_idf_smileys':tf_idf_smileys,
'tf_idf_hash':tf_idf_hash,
'tf_idf':tf_idf
# 'bag_of_words':bag_of_words
}
# Use this feature set for SVC_base_model
#features = {
# 'bag_of_words':bag_of_words
# }
def __num_args(func):
""" Get the number of arguments of a function
:func: The function to inspect the arguments
:returns: The number of arguments func has
"""
return len(inspect.getargspec(func)[0])
def featurize(train, test):
"""Creates rows of features for a given list of data
:text: the input texts to create features for
:returns: a list of lists of feature values for the input texts
"""
assert type(train) == type(test)
if isinstance(train, list) and isinstance(test, list):
feats = zip(*[(func(train), func(test)) if __num_args(func) == 1
else func(train, test)
for func in features.values()])
return hstack(feats[0]), hstack(feats[1])
else:
return [(func(train), func(test)) for func in features.values()]
#!/usr/bin/python
""" The main method - the backbone """
import pandas as pd
from sklearn import svm
#from sklearn import linear_model
#from sklearn.ensemble import RandomForestClassifier
#from sklearn.linear_model import SGDClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support as prfs
from features import featurize, features
from metrics import maac
from visualize import plot_accuracies, plot_confusion, plot_prfs
# read training data
train_cols = pd.read_csv('training/tweets.data', encoding='utf-8', sep='\t',
usecols=[1, 3, 4, 5], header=False,
names=['profile', 'texts', 'label_names', 'labels'])
# concatenate the texts together for each user profile in training
train_cols = train_cols.groupby(['profile', 'labels', 'label_names'],
as_index=False).sum()
# read test data
test_cols = pd.read_csv('test/tweets.data', encoding='utf-8', sep='\t',
usecols=[1, 3, 4, 5], header=False,
names=['profile', 'texts', 'label_names', 'labels'])
# concatenate the texts together for each user profile in tests
test_cols = test_cols.groupby(['profile', 'labels', 'label_names'],
as_index=False).sum()
# create labels
Y = train_cols['labels'].tolist()
Y_test = test_cols['labels'].tolist()
# keep all labels together sorted in a list of tuple : number , class_name
all_labels = sorted(list(set(zip(Y + Y_test,
train_cols['label_names'].tolist()
+ test_cols['label_names'].tolist()
)
)
), key=lambda x: x[0])
# get labels - class value, names - class names
labels, names = zip(*all_labels)
# create features
X, X_test = featurize(train_cols['texts'].tolist(),
test_cols['texts'].tolist())
print 'using features :\n'
print ' , '.join(features.keys())
print 'Training set size {} , features number {}'.format(*X.shape)
print 'Test set size {} , features number {}\n\n'.format(*X_test.shape)
model = svm.LinearSVC(dual=False, tol=0.0001, class_weight='auto')
#model = RandomForestClassifier(n_estimators=10, max_depth=None)
#model = SGDClassifier(loss="hinge", penalty="l2", class_weight='auto')
model.fit(X, Y)
predict = model.predict(X_test)
# naive baseline params (most frequent in training is 0 - undecidable)
#acc = accuracy_score(Y_test, [0]*len(Y_test))
#conf = confusion_matrix(Y_test, [0]*len(Y_test), labels=labels)
#prfs_data = prfs(Y_test, [0]*len(Y_test), average=None, labels=labels)
#norm params
acc = accuracy_score(Y_test, predict)
conf = confusion_matrix(Y_test, predict, labels=labels)
prfs_data = prfs(Y_test, predict, average=None, labels=labels)
macro = maac(conf)
# plot out results of run
print 'Accuracy : {0:.4f}'.format(acc)
print 'Macro Averaged Accuracy : {0:.4f}'.format(macro)
plot_accuracies(acc, macro, 'average metrics for classes', 'metric values %')
plot_prfs(prfs_data, names, 'metrics for classes', 'metric values %')
plot_confusion(conf, names, '', '')
""" module with additional metrics for classification results """
def maac(conf_mat):
""" Derive the Macro Average Accuracy metric
:confusion_matrix: The confusion matrix of the classification
:returns: The Macro Average Accuracy
"""
x, y = conf_mat.shape
assert x == y
accuracies = []
for each in range(x):
total_correct = sum(conf_mat[each, :])
tp = conf_mat[each, each]
if total_correct > 0:
acc = float(tp)/total_correct
else:
acc = 0
accuracies.append(acc)
return sum(accuracies)/x
""" Module to visualize the data """
import numpy as np
import matplotlib.pyplot as plt
def plot_class_dist(train_dist, test_dist, xlabel, ylabel):
""" Plot the class distribution
:train_dist: A list - the counts of classes in test
:test_dist: A list - the counts of classes in test
:xlabel: The label you want printed under the x axis
:ylabel: The label you want printed next to the y axis
:returns: Nothing!!! it just plots.
"""
total_classes = list(set(train_dist.keys() + test_dist.keys()))
train_values = [train_dist[label] if label in train_dist.keys() else 0
for label in total_classes]
test_values = [test_dist[label] if label in test_dist.keys() else 0
for label in total_classes]
N = len(total_classes)
ind = np.arange(N)
width = 0.3
fig, ax = plt.subplots()
train_rects = ax.bar(ind, train_values, width, color='#1E79D4')
test_rects = ax.bar(ind+width, test_values, width, color='#D4791E')
ax.set_ylabel(ylabel)
ax.set_xlabel(xlabel)
ax.set_xticks(ind+width)
ax.set_xticklabels(total_classes)
ax.legend((train_rects[0], test_rects[0]), ('train', 'test'))
ax.set_autoscaley_on(True)
plt.xlim([min(ind) - width, max(ind+1) + width])
def autolabel(rects):
""" attach some text labels """
for rect in rects:
height = rect.get_height()
ax.text(rect.get_x()+rect.get_width()/2., height + 0.1,
'%d'%int(height), ha='center', va='bottom')
autolabel(train_rects)
autolabel(test_rects)
plt.show()
def plot_prfs(prfs_data, labels, xlabel, ylabel):
""" Plot a prfs tuple
:prfs_data: A tuple of four lists with measurements
:labels: The labels you want for the classes
:xlabel: The label you want printed under the x axis
:ylabel: The label you want printed next to the y axis
:returns: Nothing!!! it just plots.
"""
total_classes = labels
precision, recall, fscore, support = prfs_data
N = len(total_classes)
ind = np.arange(N)
width = 0.22
fig, ax = plt.subplots()
prec_rects = ax.bar(ind, (precision*100), width, color='#1E79D4')
recall_rects = ax.bar(ind+width, (recall*100), width, color='#D4791E')
fscore_rects = ax.bar(ind+2*width, (fscore*100), width, color='#D41E79')
supp_rects = ax.bar(ind+3*width, (support*100.0/support.sum()), width, color='#1ED479')
ax.set_ylabel(ylabel)
ax.set_xlabel(xlabel)
ax.set_xticks(ind+width)
ax.set_xticklabels(total_classes)
ax.legend((prec_rects[0], recall_rects[0], fscore_rects[0], supp_rects[0]),
('precision', 'recall', 'fscore', 'support'))
ax.set_autoscaley_on(True)
plt.xlim([min(ind) - width, max(ind+1) + width])
def autolabel(rects):
""" attach some text labels """
for rect in rects:
height = rect.get_height()
ax.text(rect.get_x()+rect.get_width()/2., height + 0.1,
'{0:.1f}'.format(float(height)), ha='center', va='bottom')
autolabel(prec_rects)
autolabel(recall_rects)
autolabel(fscore_rects)
autolabel(supp_rects)
plt.show()
def plot_accuracies(accuracy, maac, xlabel, ylabel):
""" Plot the accuracies
:accuracy: The accuracy
:maac: The macro average accuracy
:xlabel: The label you want printed under the x axis
:ylabel: The label you want printed next to the y axis
:returns: Nothing!!! it just plots.
"""
width = 0.4
fig, ax = plt.subplots()
acc_rect = ax.bar(1 - width, [accuracy*100], width, color='#1E79D4')
maac_rect = ax.bar(1, [maac*100], width, color='#D4791E')
ax.set_ylabel(ylabel)
ax.set_xlabel(xlabel)
ax.legend((acc_rect[0], maac_rect[0]),
('accuracy', 'macro averaged accuracy'))
ax.set_autoscaley_on(True)
plt.xlim([0, 2])
plt.ylim([0, 100])
plt.xticks([])
def autolabel(rects):
""" attach some text labels """
for rect in rects:
height = rect.get_height()
ax.text(rect.get_x()+rect.get_width()/2., height + 0.1,
'{0:.1f}'.format(float(height)), ha='center', va='bottom')
autolabel(acc_rect)
autolabel(maac_rect)
plt.show()
def plot_confusion(confusion_matrix, labels, xlabel, ylabel):
""" Plot the confusion matrix
:confusion_matrix: The confusion matrix as an np array
:xlabel: The label you want printed under the x axis
:ylabel: The label you want printed next to the y axis
:returns: Nothing!!! it just plots.
"""
plt.matshow(confusion_matrix)
plt.colorbar()
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.xticks(np.arange(len(labels)), tuple(labels), rotation='vertical')
plt.yticks(np.arange(len(labels)), tuple(labels))
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment