Created
December 7, 2018 16:06
-
-
Save halolimat/8cb3787723eed7cd905e9b5d69559421 to your computer and use it in GitHub Desktop.
Supervised, Semi-supervised, and Unsupervised variants of HMM using Pomegranate
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# Author: Hussein Al-Olimat @halolimat | |
from urllib.request import urlopen | |
from itertools import groupby | |
import sys, random, dill | |
import numpy as np | |
from sklearn.feature_extraction.text import CountVectorizer | |
from sklearn_crfsuite import metrics | |
from pomegranate import * | |
# keeps only one class of our choice in all lines of the labeled dataset | |
# e.g., "RIT B-corporation" -> "RIT O" and "Hussein B-person" -> "Hussein O" | |
def keep_class(dname, l): | |
if dname == "conll2003": c = "LOC" | |
else: c="location" | |
if len(l.strip()) == 0: | |
return "" | |
else: | |
word = l.split()[0] | |
tag = l.split()[-1] | |
if c not in tag: | |
tag = "O" | |
else: | |
tag = "LOC" | |
return "{} {}".format(word, tag) | |
# Featurizers ################################################################## | |
def word2features(sent, i, gaz): | |
word = sent[i] | |
#postag = sent[i][1] | |
features = { | |
#'bias': 1.0, | |
'word.lower()': word.lower(), | |
'word[-3:]': word[-3:], | |
'word[-2:]': word[-2:], | |
'word.isupper()': word.isupper(), | |
'word.istitle()': word.istitle(), | |
'word.isdigit()': word.isdigit(), | |
#'postag': postag, | |
#'postag[:2]': postag[:2], | |
} | |
# add gaz features | |
if gaz: | |
if word.lower() in gaz and USEGAZETTEER: features["InGazetteer"] = True | |
if i > 0: | |
word1 = sent[i-1][0] | |
#postag1 = sent[i-1][1] | |
features.update({ | |
'-1:word.lower()': word1.lower(), | |
'-1:word.istitle()': word1.istitle(), | |
'-1:word.isupper()': word1.isupper(), | |
#'-1:postag': postag1, | |
#'-1:postag[:2]': postag1[:2], | |
}) | |
# add gaz features | |
if gaz: | |
if word1.lower() in gaz and USEGAZETTEER: features["PreviousInGazetteer"] = True | |
else: | |
features['BOS'] = True | |
if i < len(sent)-1: | |
word1 = sent[i+1][0] | |
#postag1 = sent[i+1][1] | |
features.update({ | |
'+1:word.lower()': word1.lower(), | |
'+1:word.istitle()': word1.istitle(), | |
'+1:word.isupper()': word1.isupper(), | |
#'+1:postag': postag1, | |
#'+1:postag[:2]': postag1[:2], | |
}) | |
if gaz: | |
if word1.lower() in gaz and USEGAZETTEER: features["NextInGazetteer"] = True | |
else: | |
features['EOS'] = True | |
return features | |
def simple_Featurizer(X, gaz): | |
features_set = set() | |
X_featurized = [] | |
for c, sent in enumerate(X): | |
sentence_features = [] | |
for i, word in enumerate(sent): | |
features = word2features(sent, i, gaz) | |
trans = [str(x)+"="+str(features[x]) for x in features] | |
sentence_features.append(trans) | |
features_set.update(set(trans)) | |
X_featurized.append(sentence_features) | |
sys.stdout.write("\rFeaturizing Data, %d Sentences left." % (len(X)-c-1)) | |
sys.stdout.flush() | |
print() | |
return X_featurized, features_set | |
def identity_featurizer(X, _): | |
return X, set([y for x in X for y in x]) | |
#################################################### | |
# split words and tags for pomegranate | |
def split_to_tokens_and_labels(sentences): | |
sentences_words = [] | |
sentences_tags = [] | |
for _, sentence in enumerate(sentences): | |
sentence_words = [] | |
sentence_tags = [] | |
for i, labeled_token in enumerate(sentence): | |
word, tag = labeled_token.split() | |
sentence_tags.append(tag) | |
sentence_words.append(word) | |
sentences_tags.append(tuple(sentence_tags)) | |
sentences_words.append(tuple(sentence_words)) | |
return sentences_words, sentences_tags | |
def get_train_test_data(dname, featurizer, gaz): | |
if dname == "conll2003": | |
train_url = "https://raw.githubusercontent.com/halolimat/SpExtor/master/src/test/resources/spExtor/CoNLL_2003_LOC.Train" | |
test_url = "https://raw.githubusercontent.com/halolimat/SpExtor/master/src/test/resources/spExtor/CoNLL_2003_LOC.TestA" | |
else: | |
# Data sources | |
dev_url = "https://raw.githubusercontent.com/halolimat/NER-WNUT17/master/data/emerging.dev.conll" | |
train_url = "https://raw.githubusercontent.com/halolimat/NER-WNUT17/master/data/emerging.train.conll" | |
test_url = "https://raw.githubusercontent.com/halolimat/NER-WNUT17/master/data/emerging.test.conll" | |
# Read file into a list after removing '\n' character from all lines | |
train = [x.decode("UTF-8").replace("\n","").replace("\t", " ") for x in urlopen(train_url).readlines()] | |
test = [x.decode("UTF-8").replace("\n","").replace("\t", " ") for x in urlopen(test_url).readlines()] | |
train = [keep_class(dname, l) for l in train] | |
test = [keep_class(dname, l) for l in test] | |
train = [list(group) for k, group in groupby(train, lambda x: x == "") if not k] | |
test = [list(group) for k, group in groupby(test, lambda x: x == "") if not k] | |
train_X, train_y = split_to_tokens_and_labels(train) | |
test_X, test_y = split_to_tokens_and_labels(test) | |
train_X, features_set =featurizer(train_X, gaz) | |
test_X, _ = featurizer(test_X, gaz) | |
return train_X, train_y, test_X, test_y, features_set | |
# Training Pomegranate HMM ##################################################### | |
def HMM_from_samples(X, y, vectorizer, state_names): | |
# vectorize features | |
sentences_fvs = [] | |
for i, sent in enumerate(X): | |
sentences_fvs.append(vectorizer.transform(sent).todense()) | |
sys.stdout.write("\rVecotrizing, %d sentences left." % (len(X)-i-1)) | |
sys.stdout.flush() | |
print() | |
model = HiddenMarkovModel.from_samples( distribution=BernoulliDistribution, | |
n_components=2, | |
X=sentences_fvs, | |
labels=y, | |
state_names=state_names, | |
verbose=True, | |
max_iterations=200) | |
return model | |
def init_HMM(vectorizer_vocabulary, state_names): | |
model = HiddenMarkovModel("HMM") | |
d1 = [] | |
d2 = [] | |
for _, v in enumerate(vectorizer_vocabulary): | |
p = random.uniform(0, 1) | |
d1.append(BernoulliDistribution(p)) | |
d2.append(BernoulliDistribution(1-p)) | |
s1 = State(IndependentComponentsDistribution(d1), name=state_names[0]) | |
s2 = State(IndependentComponentsDistribution(d2), name=state_names[1]) | |
model.add_states([s1, s2]) | |
p = random.uniform(0, 1) | |
model.add_transition(model.start, s1, p) | |
model.add_transition(model.start, s2, 1-p) | |
p = np.random.dirichlet(np.ones(3),size=1)[0] | |
model.add_transition(s1, s1, p[0]) | |
model.add_transition(s1, s2, p[1]) | |
model.add_transition(s1, model.end, p[2]) | |
p = np.random.dirichlet(np.ones(3),size=1)[0] | |
model.add_transition(s2, s1, p[0]) | |
model.add_transition(s2, s2, p[1]) | |
model.add_transition(s2, model.end, p[2]) | |
model.bake(verbose=True) | |
return model | |
def chunks_2(l, n): | |
"""Yield successive n-sized chunks from l.""" | |
for i in range(0, len(l), n): | |
yield (i,i+n) | |
def HMM_from_summaries(X, y, vectorizer, state_names): | |
print("\nInit HMM ...") | |
model = init_HMM(vectorizer.vocabulary_, state_names) | |
chunk_size = 500 | |
print("Training on Chunks ...") | |
count = len(X)/chunk_size | |
print("Number of chunks is %d" % count) | |
for chunk_idx in chunks_2(X, chunk_size): | |
chunk=X[chunk_idx[0]:chunk_idx[1]] | |
transformed_chunk=[] | |
for i, sent in enumerate(chunk): | |
transformed_chunk.append(vectorizer.transform(sent).todense()) | |
if y: y=y[chunk_idx[0]:chunk_idx[1]] | |
model.summarize(transformed_chunk, labels=y) | |
count-=1 | |
sys.stdout.write("\rmodel.summarize. %d chunks left." % count) | |
sys.stdout.flush() | |
print("\nFitting the model on Summaries ...") | |
model.from_summaries() | |
return model | |
# Test Fitted Models ########################################################### | |
def replace_unknown(sequence): return [w if w in set(words) else 'nan' for w in sequence] | |
def simplify_decoding(X, model): | |
_, state_path = model.viterbi(X) | |
if state_path is None: return ["s0"]*len(X) | |
return [x for x in [state[1].name for state in state_path[1:]] if x!= "HMM-end"] | |
def pred(X, model): | |
d = {"LOC": "O", "O": "LOC"} | |
d2 = {"s0": "LOC", "s1": "O"} | |
preds = [] | |
preds_inv = [] | |
for sent in X: | |
pred = [x for x in simplify_decoding(sent, model)] | |
if "s0" in pred: pred=[d2[x] for x in pred] | |
preds.append(pred) | |
pred_inv = [d[x] for x in pred] | |
preds_inv.append(pred_inv) | |
return preds, preds_inv | |
def getTpFpn(test_tags, pred_tags): | |
tag_class = "LOC" | |
tp, fp, tn, fn = 0, 0, 0, 0 | |
for s,sent in enumerate(test_tags): | |
for t, tag in enumerate(sent): | |
if test_tags[s][t] == pred_tags[s][t]: | |
if test_tags[s][t] == tag_class: | |
tp += 1 | |
elif test_tags[s][t] == tag_class: | |
fn += 1 | |
elif pred_tags[s][t] == tag_class: | |
fp += 1 | |
return [tp, fp, fn] | |
def getPRF(TPFPN_score): | |
TpFpn = np.sum(TPFPN_score, axis=0) | |
P = TpFpn[0]/(TpFpn[0] + TpFpn[1]) | |
R = TpFpn[0]/(TpFpn[0] + TpFpn[2]) | |
F = 2*P*R/(P+R) | |
return P, R, F | |
def chunks(l, n): | |
"""Yield successive n-sized chunks from l.""" | |
for i in range(0, len(l), n): | |
yield l[i:i + n] | |
def test_model(model, test_X, test_y, vectorizer, state_names): | |
chunk_size = 600 | |
count = len(test_X)/chunk_size | |
start, end = 0, 0 | |
TPFPN_score = [] | |
TPFPN_score_inv = [] | |
for chunk in chunks(test_X, chunk_size): | |
test_sentences_fvs=[] | |
for i, sent in enumerate(chunk): | |
test_sentences_fvs.append(vectorizer.transform(sent).todense()) | |
end += len(test_sentences_fvs) | |
test_tags = test_y[start:end] | |
assert len(test_sentences_fvs) == len(test_tags) | |
predicted_tags, predicted_tags_inv = pred(test_sentences_fvs, model) | |
# report = metrics.flat_classification_report(test_tags, | |
# predicted_tags, | |
# labels=state_names, | |
# digits=3) | |
TPFPN_score.append(getTpFpn(test_tags, predicted_tags)) | |
TPFPN_score_inv.append(getTpFpn(test_tags, predicted_tags_inv)) | |
start = end | |
try: | |
print ("P, R, F: ", getPRF(TPFPN_score)) | |
except: | |
print("Error in TPFPN_score") | |
try: | |
print ("P, R, F: ", getPRF(TPFPN_score_inv)) | |
except: | |
print("Error in TPFPN_score_inv") | |
################################################################################ | |
def serialize_model(model, model_path): | |
with open("models/"+model_path, "w") as f: | |
json.dump(model.to_json(), f) | |
def deserialize_model(model_path): | |
with open("models/"+model_path) as f: | |
model = HiddenMarkovModel.from_json(json.load(f)) | |
return model | |
def serialize_vectorizer(vectorizer, path): | |
with open("models/"+path, "wb") as dill_file: | |
dill.dump(vectorizer, dill_file) | |
def deserialize_vectorizer(path): | |
with open(path, 'rb') as file: | |
return dill.load(file) | |
def get_partial_tags(train_y): | |
train_y_partial_tags=train_y.copy() | |
number_to_replace=int(0.9*len(train_y_partial_tags)) # 10% labeled only | |
for i in range(number_to_replace): | |
train_y_partial_tags[random.randrange(0,len(train_y_partial_tags))]=None | |
return train_y_partial_tags | |
################################################################################ | |
################################################################################ | |
if __name__=="__main__": | |
FROM_SUMMARIES=True | |
train_X, train_y, test_X, test_y, features_set = get_train_test_data("conll2003", identity_featurizer, None) | |
#train_X, train_y, test_X, test_y, features_set = get_train_test_data("conll2003", simple_Featurizer, None) | |
vectorizer = CountVectorizer(tokenizer=lambda doc: doc, lowercase=False).fit([features_set]) | |
serialize_vectorizer(vectorizer, "conll2003_vectorizer") | |
states=["LOC", "O"] | |
print("#"*20) | |
print("Fitting Supservised HMM") | |
model = HMM_from_summaries(train_X, train_y, vectorizer, states) | |
# model = HMM_from_samples(train_X, train_y, vectorizer, states) | |
serialize_model(model, "conll2003_supervised") | |
# print("#"*20) | |
# print("Fitting Semi-supservised HMM") | |
# model = HMM_from_summaries(train_X, get_partial_tags(train_y), vectorizer, states) | |
# # model = HMM_from_samples(train_X, get_partial_tags(train_y), vectorizer, states) | |
# serialize_model(model, "conll2003_semisupervised") | |
# | |
# print("#"*20) | |
# print("Fitting Unsupservised HMM") | |
# model = HMM_from_summaries(train_X, None, vectorizer, states) | |
# # model = HMM_from_samples(train_X, None, vectorizer, states) | |
# serialize_model(model, "conll2003_unsupervised_from_samples") | |
############################################################################ | |
# Test | |
print() | |
print("#"*50) | |
print() | |
print("#"*50) | |
print("Testing Supervised HMM") | |
print("#"*50) | |
test_model( deserialize_model("conll2003_supervised"), | |
test_X=test_X, | |
test_y=test_y, | |
vectorizer=vectorizer, | |
state_names=states) | |
# P, R, F: (0.043953963010499295, 0.998565965583174, 0.08420161631633044) | |
# P, R, F: (0.0008207934336525308, 0.0014340344168260039, 0.001044022968505307) | |
# print("#"*50) | |
# print("Testing Semi-supervised HMM") | |
# print("#"*50) | |
# test_model( model=deserialize_model("conll2003_semisupervised"), | |
# test_X=test_X, | |
# test_y=test_y, | |
# vectorizer=vectorizer, | |
# state_names=states) | |
# P, R, F: (0.040873744675862606, 1.0, 0.07853737282727033) | |
# print("#"*50) | |
# print("Testing Unsupervised HMM") | |
# print("#"*50) | |
# test_model( model=deserialize_model("conll2003_unsupervised"), | |
# test_X=test_X, | |
# test_y=test_y, | |
# vectorizer=vectorizer, | |
# state_names=states) | |
# | |
# P, R, F: (0.043953963010499295, 0.998565965583174, 0.08420161631633044) | |
# P, R, F: (0.0008207934336525308, 0.0014340344168260039, 0.001044022968505307) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment