Skip to content

Instantly share code, notes, and snippets.

@g-leech
Last active May 20, 2020
Embed
What would you like to do?
NLP helpers
#%tensorflow_version 2.x
import pandas as pd
import numpy as np
import re
from nltk import word_tokenize
from nltk.stem import WordNetLemmatizer
from scipy.sparse import hstack
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import SGDRegressor, LinearRegression, BayesianRidge
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import mean_squared_error, make_scorer
from scipy.stats import uniform
from sklearn.model_selection import cross_val_score
def check_gpu() :
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))
# Hossein's dumb baseline
def score_task_1(truth_loc, prediction_loc):
truth = pd.read_csv(truth_loc, usecols=['id','meanGrade'])
pred = pd.read_csv(prediction_loc, usecols=['id','pred'])
assert(sorted(truth.id) == sorted(pred.id)),"ID mismatch between ground truth and prediction!"
data = pd.merge(truth,pred)
rmse = np.sqrt(np.mean((data['meanGrade'] - data['pred'])**2))
return rmse
def rmse(labels, preds):
return np.sqrt(np.mean((labels - preds)**2))
def get_ngram(n) :
return (1, n)
def swop_tags(X):
plugin = lambda x: re.sub(r"<.+/>", x["edit"], x["original"])
return X.apply(plugin, axis=1)
def clean_tags(X):
capture = r"<(.+)/>"
return X.str.replace(capture, "\g<1>")
def stack_both_headlines(X, n, stop_words, vocab=None, edit_vocab=None) :
X, vocabulary = bag_o_words(X, n=n, stops=stop_words)
X_edit, edit_vocabulary = bag_o_words(X, n=n, \
stops=stop_words, \
colName="edited")
if not vocab and not edit_vocab :
return hstack((X, X_edit)), vocabulary, edit_vocabulary
else :
return hstack((X, X_edit))
def bag_o_words(X,
n=2,
stops=None,
colName="original",
vocab=None) :
args = {'strip_accents' : "ascii",
'lowercase' : True,
'stop_words' : stops,
'ngram_range' : get_ngram(n),
'min_df':2
}
if vocab :
args['vocabulary'] = vocab
vect = CountVectorizer(**args)
print(vect)
return vect.fit_transform(X[colName]), \
vect.get_feature_names()
def tf_idf(X, n=2, stops=None, colName="original", vocab=None) :
tfer = TfidfVectorizer(
tokenizer=LemmaTokenizer(),
strip_accents="ascii",
lowercase=True,
stop_words=stops,
ngram_range=get_ngram(n),
min_df=2,
vocabulary=vocab
)
return tfer.fit_transform(X[colName]), \
tfer.get_feature_names()
# Copied from
# https://scikit-learn.org/stable/modules/feature_extraction.html
class LemmaTokenizer:
def __init__(self):
self.wnl = WordNetLemmatizer()
def __call__(self, doc):
return [self.wnl.lemmatize(t) \
for t in word_tokenize(doc)]
def test_some_model(X_train, y_train, X_test, y_test) :
s = SGDRegressor()
s.fit(X_train, y_train)
return rmse(y_test, np.array(s.predict(X_test)))
def fit_all_models(models, X, y) :
return [ model.fit(X, y) \
for model in models ]
def get_best(results) :
bestRmse = min(results.values())
indexOfBest = list(results.values()).index(bestRmse)
model = list(results.keys())[indexOfBest]
return model, bestRmse
# Define the hyperparameter ranges for each model
def lookup_model_hypers(model) :
d = {
SGDRegressor().__class__ : dict(alpha=np.arange(0,0.5, 0.01), \
penalty=['l2', 'l1']),
LinearRegression().__class__ : dict(),
BayesianRidge().__class__ : dict(alpha_1=np.arange(0,10,0.1) , \
alpha_2=np.arange(0,10,0.1), \
lambda_1=np.arange(0,10,0.1), \
lambda_2=np.arange(0,10,0.1), \
),
RandomForestRegressor().__class__ : dict(n_estimators=np.arange(20,300,40), \
max_depth=np.arange(1,7,1), \
min_samples_leaf=np.arange(2,20,4)),
GradientBoostingRegressor().__class__ : dict(n_estimators=np.arange(20,300,40), \
max_depth=np.arange(1,7,1), \
min_samples_leaf=np.arange(2,20,4)),
SVR().__class__ : dict(kernel=['rbf', 'sigmoid'], \
gamma=['scale', 'auto'], \
C=np.arange(0, 2, 0.1))
}
return d[model.__class__]
# Randomised beats gridsearch
def hyperparam_search(model, X, y) :
distributions = lookup_model_hypers(model)
search = RandomizedSearchCV(model, \
param_distributions=distributions, \
n_jobs=-1)
return search.fit(X, y) \
.best_params_
def apply_best_params(model, param_dict) :
cls = model.__class__
return cls(**param_dict)
def find_best_params(models, X, y) :
best_params = { model : hyperparam_search(model, X, y) \
for model in models }
return [ apply_best_params(m, best_params[m]) \
for m in models ]
def test_models(models, X, y) :
preds_by_model = { m : m.predict(X) \
for m in models }
return { m: rmse(y, p) \
for m, p \
in preds_by_model.items() }
def add_edit_index(X, editDelimiter="<") :
X["list"] = X.original.str.split()
idx_finder = lambda l: [idx for idx, el in enumerate(l) if '<' in el][0]
X['EditIndex'] = X["list"].apply(idx_finder)
X['Length'] = pd.Series([len(x) for x in X.original.str.split()])
X["EditProportion"] = X['EditIndex'] / X['Length']
return X
def pair_up_vectors(X, model, stops) :
X_vector = vectorise(X, model, stops, "original")
X_edit = vectorise(X, model, stops, "edited")
return [np.concatenate((X_vector[i], X_edit[i]), axis=None) \
for i in range(len(X_vector))
]
# Silliest method of embedding headline: sentence as mean of word embedding
def sentence_embedding(model, sentence):
sentence = [word for word in sentence \
if word in model.vocab]
return np.mean(model[sentence], axis=0)
def tokenise(headline, stops):
return [word for word in word_tokenize(headline.lower()) \
if word not in stops]
def get_corpus(X, stops, col="original"):
l = lambda x : tokenise(x, stops)
return X[col].apply(l)
def vectorise(X, model, stops, col="original"):
corpus = get_corpus(X, stops, "original")
return np.array([ sentence_embedding(model, doc) \
for doc in corpus ])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment