Created
March 9, 2020 07:56
-
-
Save ErikTromp/718ff1d05ff29d09006733a19482bb16 to your computer and use it in GitHub Desktop.
text categorization with spacy-transformers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import plac | |
import re | |
import random | |
import json | |
import pandas as pd | |
from pathlib import Path | |
from collections import Counter | |
import thinc.extra.datasets | |
import spacy | |
import torch | |
from spacy.util import minibatch | |
import tqdm | |
import unicodedata | |
import wasabi | |
from spacy_transformers.util import cyclic_triangular_rate | |
from sklearn.model_selection import train_test_split | |
import re | |
def remove_emojis(data): | |
emoj = re.compile("[" | |
u"\U0001F600-\U0001F64F" # emoticons | |
u"\U0001F300-\U0001F5FF" # symbols & pictographs | |
u"\U0001F680-\U0001F6FF" # transport & map symbols | |
u"\U0001F1E0-\U0001F1FF" # flags (iOS) | |
u"\U00002500-\U00002BEF" # chinese char | |
u"\U00002702-\U000027B0" | |
u"\U00002702-\U000027B0" | |
u"\U000024C2-\U0001F251" | |
u"\U0001f926-\U0001f937" | |
u"\U00010000-\U0010ffff" | |
u"\u2640-\u2642" | |
u"\u2600-\u2B55" | |
u"\u200d" | |
u"\u23cf" | |
u"\u23e9" | |
u"\u231a" | |
u"\ufe0f" # dingbats | |
u"\u3030" | |
"]+", re.UNICODE) | |
return re.sub(emoj, '', data) | |
def StratifiedSampling(df, class_col): | |
classes = list(df[class_col].unique()) | |
min_count = len(df) | |
for c in classes: | |
if len(df.loc[df[class_col] == c]) < min_count: | |
min_count = len(df.loc[df[class_col] == c]) | |
newdata = [] | |
for c in classes: | |
newdata.append(df.loc[df[class_col] == c].sample(min_count, random_state=1234)) | |
return pd.concat(newdata) | |
@plac.annotations( | |
model=("Model name", "positional", None, str), | |
input_dir=("Optional input directory", "option", "i", Path), | |
output_dir=("Optional output directory", "option", "o", Path), | |
use_test=("Whether to use the actual test set", "flag", "E"), | |
batch_size=("Number of docs per batch", "option", "bs", int), | |
learn_rate=("Learning rate", "option", "lr", float), | |
max_wpb=("Max words per sub-batch", "option", "wpb", int), | |
n_texts=("Number of texts to train from", "option", "t", int), | |
n_iter=("Number of training epochs", "option", "n", int), | |
pos_label=("Positive label for evaluation", "option", "pl", str), | |
) | |
def main( | |
model="D:/bert-base-dutch-cased/bertje-base", | |
input_dir=None, | |
output_dir=None, | |
n_iter=5, | |
n_texts=100, | |
batch_size=8, | |
learn_rate=2e-5, | |
max_wpb=1000, | |
use_test=False, | |
pos_label=None, | |
): | |
spacy.util.fix_random_seed(0) | |
is_using_gpu = spacy.prefer_gpu() | |
if is_using_gpu: | |
torch.set_default_tensor_type("torch.cuda.FloatTensor") | |
if output_dir is not None: | |
output_dir = Path(output_dir) | |
if not output_dir.exists(): | |
output_dir.mkdir() | |
nlp = spacy.load(model) | |
print(nlp.pipe_names) | |
print(f"Loaded model '{model}'") | |
textcat = nlp.create_pipe( | |
"trf_textcat", | |
config={"architecture": "softmax_last_hidden", "words_per_batch": max_wpb, "exclusive_classes": True}, | |
) | |
labels = ['joy', 'sadness','trust', 'disgust','fear', 'anger','surprise', 'anticipation', 'none'] | |
for label in labels: | |
textcat.add_label(label) | |
pos_label = "anger" | |
df = pd.read_csv('D:/emotions_class.csv', encoding='utf-8', sep=';') | |
df['text'] = df['text'].apply(lambda x: remove_emojis(x)) | |
df = df.loc[df['text'].str.len() > 50] | |
df = StratifiedSampling(df, 'labels_text') | |
#df = df.sample(frac=0.2, random_state=1234) | |
train_df, test_df = train_test_split(df, test_size=0.3, random_state=123456, stratify=df['labels_text']) | |
(train_texts, train_cats) = (df['text'].tolist(), df['labels_text'].tolist()) | |
train_data = list(zip(train_texts, [{cat: cat == cats for cat in labels} for cats in train_cats])) | |
eval_texts = test_df['text'].tolist() | |
eval_cats = [{cat: cat == cats for cat in labels} for cats in test_df['labels_text'].tolist()] | |
print("Labels:", textcat.labels) | |
print("Positive label for evaluation:", pos_label) | |
nlp.add_pipe(textcat, last=True) | |
print(f"Using {len(train_texts)} training docs, {len(eval_texts)} evaluation") | |
split_training_by_sentence = True | |
if split_training_by_sentence: | |
# If we're using a model that averages over sentence predictions (we are), | |
# there are some advantages to just labelling each sentence as an example. | |
# It means we can mix the sentences into different batches, so we can make | |
# more frequent updates. It also changes the loss somewhat, in a way that's | |
# not obviously better -- but it does seem to work well. | |
train_texts, train_cats = make_sentence_examples(nlp, train_texts, train_cats) | |
print(f"Extracted {len(train_texts)} training sents") | |
# total_words = sum(len(text.split()) for text in train_texts) | |
#train_data = list(zip(train_texts, [{"cats": cats} for cats in train_cats])) | |
# Initialize the TextCategorizer, and create an optimizer. | |
optimizer = nlp.resume_training() | |
optimizer.alpha = 0.001 | |
optimizer.trf_weight_decay = 0.005 | |
optimizer.L2 = 0.0 | |
learn_rates = cyclic_triangular_rate( | |
learn_rate / 3, learn_rate * 3, 2 * len(train_data) // batch_size | |
) | |
print("Training the model...") | |
print("{:^5}\t{:^5}\t{:^5}\t{:^5}\t{:^5}".format("LOSS", "P", "R", "F", "A")) | |
pbar = tqdm.tqdm(total=100, leave=False) | |
results = [] | |
epoch = 0 | |
step = 0 | |
eval_every = 100 | |
patience = 3 | |
while True: | |
# Train and evaluate | |
losses = Counter() | |
random.shuffle(train_data) | |
batches = minibatch(train_data, size=batch_size) | |
for batch in batches: | |
optimizer.trf_lr = next(learn_rates) | |
texts, annotations = zip(*batch) | |
nlp.update(texts, annotations, sgd=optimizer, drop=0.1, losses=losses) | |
pbar.update(1) | |
if step and (step % eval_every) == 0: | |
pbar.close() | |
with nlp.use_params(optimizer.averages): | |
scores = evaluate(nlp, eval_texts, eval_cats, pos_label) | |
results.append((scores["textcat_f"], step, epoch)) | |
print( | |
"{0:.3f}\t{1:.3f}\t{2:.3f}\t{3:.3f}\t{4:.3f}".format( | |
losses["trf_textcat"], | |
scores["textcat_p"], | |
scores["textcat_r"], | |
scores["textcat_f"], | |
scores["textcat_a"], | |
) | |
) | |
pbar = tqdm.tqdm(total=eval_every, leave=False) | |
step += 1 | |
epoch += 1 | |
# Stop if no improvement in HP.patience checkpoints | |
if results: | |
best_score, best_step, best_epoch = max(results) | |
if ((step - best_step) // eval_every) >= patience: | |
break | |
msg = wasabi.Printer() | |
table_widths = [2, 4, 6] | |
msg.info(f"Best scoring checkpoints") | |
msg.row(["Epoch", "Step", "Score"], widths=table_widths) | |
msg.row(["-" * width for width in table_widths]) | |
for score, step, epoch in sorted(results, reverse=True)[:10]: | |
msg.row([epoch, step, "%.2f" % (score * 100)], widths=table_widths) | |
# Test the trained model | |
test_text = eval_texts[0] | |
doc = nlp(test_text) | |
print(test_text, doc.cats) | |
if output_dir is not None: | |
nlp.to_disk(output_dir) | |
print("Saved model to", output_dir) | |
# test the saved model | |
print("Loading from", output_dir) | |
nlp2 = spacy.load(output_dir) | |
doc2 = nlp2(test_text) | |
print(test_text, doc2.cats) | |
def read_inputs(input_path): | |
texts = [] | |
cats = [] | |
with input_path.open(mode="r") as file_: | |
for line in file_: | |
text, gold = json.loads(line) | |
text = preprocess_text(text) | |
texts.append(text) | |
cats.append(gold["cats"]) | |
return texts, cats | |
def make_sentence_examples(nlp, texts, labels): | |
"""Treat each sentence of the document as an instance, using the doc labels.""" | |
sents = [] | |
sent_cats = [] | |
for text, cats in zip(texts, labels): | |
doc = nlp.make_doc(text) | |
doc = nlp.get_pipe("sentencizer")(doc) | |
for sent in doc.sents: | |
sents.append(sent.text) | |
sent_cats.append(cats) | |
return sents, sent_cats | |
white_re = re.compile(r"\s\s+") | |
def preprocess_text(text): | |
text = text.replace("<s>", "<open-s-tag>") | |
text = text.replace("</s>", "<close-s-tag>") | |
text = white_re.sub(" ", text).strip() | |
return "".join( | |
c for c in unicodedata.normalize("NFD", text) if unicodedata.category(c) != "Mn" | |
) | |
def load_data(*, limit=0, dev_size=2000): | |
"""Load data from the IMDB dataset, splitting off a held-out set.""" | |
if limit != 0: | |
limit += dev_size | |
assert dev_size != 0 | |
train_data, _ = thinc.extra.datasets.imdb(limit=limit) | |
assert len(train_data) > dev_size | |
random.shuffle(train_data) | |
dev_data = train_data[:dev_size] | |
train_data = train_data[dev_size:] | |
train_texts, train_labels = _prepare_partition(train_data, preprocess=False) | |
dev_texts, dev_labels = _prepare_partition(dev_data, preprocess=False) | |
return (train_texts, train_labels), (dev_texts, dev_labels) | |
def load_data_for_final_test(*, limit=0): | |
print( | |
"Warning: Using test data. You should use development data for most experiments." | |
) | |
train_data, test_data = thinc.extra.datasets.imdb() | |
random.shuffle(train_data) | |
train_data = train_data[-limit:] | |
train_texts, train_labels = _prepare_partition(train_data) | |
test_texts, test_labels = _prepare_partition(test_data) | |
return (train_texts, train_labels), (test_texts, test_labels) | |
def _prepare_partition(text_label_tuples, *, preprocess=False): | |
texts, labels = zip(*text_label_tuples) | |
if preprocess: | |
# Preprocessing can mask errors in our handling of noisy text, so | |
# we don't want to do it by default | |
texts = [preprocess_text(text) for text in texts] | |
cats = [{"POSITIVE": bool(y), "NEGATIVE": not bool(y)} for y in labels] | |
return texts, cats | |
def evaluate(nlp, texts, cats, pos_label): | |
tp = 0.0 # True positives | |
fp = 0.0 # False positives | |
fn = 0.0 # False negatives | |
tn = 0.0 # True negatives | |
total_words = sum(len(text.split()) for text in texts) | |
with tqdm.tqdm(total=total_words, leave=False) as pbar: | |
for i, doc in enumerate(nlp.pipe(texts, batch_size=8)): | |
gold = cats[i] | |
for label, score in doc.cats.items(): | |
if label not in gold: | |
continue | |
if label != pos_label: | |
continue | |
try: | |
if score >= 0.5 and gold[label] >= 0.5: | |
tp += 1.0 | |
elif score >= 0.5 and gold[label] < 0.5: | |
fp += 1.0 | |
elif score < 0.5 and gold[label] < 0.5: | |
tn += 1 | |
elif score < 0.5 and gold[label] >= 0.5: | |
fn += 1 | |
except: | |
print("gold") | |
print(gold) | |
print("label") | |
print(label) | |
exit() | |
pbar.update(len(doc.text.split())) | |
precision = tp / (tp + fp + 1e-8) | |
recall = tp / (tp + fn + 1e-8) | |
accuracy = (tp + tn) / (tp + tn + fp + fn + 1e-8) | |
if (precision + recall) == 0: | |
f_score = 0.0 | |
else: | |
f_score = 2 * (precision * recall) / (precision + recall) | |
return {"textcat_p": precision, "textcat_r": recall, "textcat_f": f_score, "textcat_a": accuracy} | |
if __name__ == "__main__": | |
plac.call(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment