Skip to content

Instantly share code, notes, and snippets.

@halolimat
Created December 7, 2018 16:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save halolimat/8cb3787723eed7cd905e9b5d69559421 to your computer and use it in GitHub Desktop.
Save halolimat/8cb3787723eed7cd905e9b5d69559421 to your computer and use it in GitHub Desktop.
Supervised, Semi-supervised, and Unsupervised variants of HMM using Pomegranate
# coding: utf-8
# Author: Hussein Al-Olimat @halolimat
from urllib.request import urlopen
from itertools import groupby
import sys, random, dill
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn_crfsuite import metrics
from pomegranate import *
# keeps only one class of our choice in all lines of the labeled dataset
# e.g., "RIT B-corporation" -> "RIT O" and "Hussein B-person" -> "Hussein O"
def keep_class(dname, l):
if dname == "conll2003": c = "LOC"
else: c="location"
if len(l.strip()) == 0:
return ""
else:
word = l.split()[0]
tag = l.split()[-1]
if c not in tag:
tag = "O"
else:
tag = "LOC"
return "{} {}".format(word, tag)
# Featurizers ##################################################################
def word2features(sent, i, gaz):
word = sent[i]
#postag = sent[i][1]
features = {
#'bias': 1.0,
'word.lower()': word.lower(),
'word[-3:]': word[-3:],
'word[-2:]': word[-2:],
'word.isupper()': word.isupper(),
'word.istitle()': word.istitle(),
'word.isdigit()': word.isdigit(),
#'postag': postag,
#'postag[:2]': postag[:2],
}
# add gaz features
if gaz:
if word.lower() in gaz and USEGAZETTEER: features["InGazetteer"] = True
if i > 0:
word1 = sent[i-1][0]
#postag1 = sent[i-1][1]
features.update({
'-1:word.lower()': word1.lower(),
'-1:word.istitle()': word1.istitle(),
'-1:word.isupper()': word1.isupper(),
#'-1:postag': postag1,
#'-1:postag[:2]': postag1[:2],
})
# add gaz features
if gaz:
if word1.lower() in gaz and USEGAZETTEER: features["PreviousInGazetteer"] = True
else:
features['BOS'] = True
if i < len(sent)-1:
word1 = sent[i+1][0]
#postag1 = sent[i+1][1]
features.update({
'+1:word.lower()': word1.lower(),
'+1:word.istitle()': word1.istitle(),
'+1:word.isupper()': word1.isupper(),
#'+1:postag': postag1,
#'+1:postag[:2]': postag1[:2],
})
if gaz:
if word1.lower() in gaz and USEGAZETTEER: features["NextInGazetteer"] = True
else:
features['EOS'] = True
return features
def simple_Featurizer(X, gaz):
features_set = set()
X_featurized = []
for c, sent in enumerate(X):
sentence_features = []
for i, word in enumerate(sent):
features = word2features(sent, i, gaz)
trans = [str(x)+"="+str(features[x]) for x in features]
sentence_features.append(trans)
features_set.update(set(trans))
X_featurized.append(sentence_features)
sys.stdout.write("\rFeaturizing Data, %d Sentences left." % (len(X)-c-1))
sys.stdout.flush()
print()
return X_featurized, features_set
def identity_featurizer(X, _):
return X, set([y for x in X for y in x])
####################################################
# split words and tags for pomegranate
def split_to_tokens_and_labels(sentences):
sentences_words = []
sentences_tags = []
for _, sentence in enumerate(sentences):
sentence_words = []
sentence_tags = []
for i, labeled_token in enumerate(sentence):
word, tag = labeled_token.split()
sentence_tags.append(tag)
sentence_words.append(word)
sentences_tags.append(tuple(sentence_tags))
sentences_words.append(tuple(sentence_words))
return sentences_words, sentences_tags
def get_train_test_data(dname, featurizer, gaz):
if dname == "conll2003":
train_url = "https://raw.githubusercontent.com/halolimat/SpExtor/master/src/test/resources/spExtor/CoNLL_2003_LOC.Train"
test_url = "https://raw.githubusercontent.com/halolimat/SpExtor/master/src/test/resources/spExtor/CoNLL_2003_LOC.TestA"
else:
# Data sources
dev_url = "https://raw.githubusercontent.com/halolimat/NER-WNUT17/master/data/emerging.dev.conll"
train_url = "https://raw.githubusercontent.com/halolimat/NER-WNUT17/master/data/emerging.train.conll"
test_url = "https://raw.githubusercontent.com/halolimat/NER-WNUT17/master/data/emerging.test.conll"
# Read file into a list after removing '\n' character from all lines
train = [x.decode("UTF-8").replace("\n","").replace("\t", " ") for x in urlopen(train_url).readlines()]
test = [x.decode("UTF-8").replace("\n","").replace("\t", " ") for x in urlopen(test_url).readlines()]
train = [keep_class(dname, l) for l in train]
test = [keep_class(dname, l) for l in test]
train = [list(group) for k, group in groupby(train, lambda x: x == "") if not k]
test = [list(group) for k, group in groupby(test, lambda x: x == "") if not k]
train_X, train_y = split_to_tokens_and_labels(train)
test_X, test_y = split_to_tokens_and_labels(test)
train_X, features_set =featurizer(train_X, gaz)
test_X, _ = featurizer(test_X, gaz)
return train_X, train_y, test_X, test_y, features_set
# Training Pomegranate HMM #####################################################
def HMM_from_samples(X, y, vectorizer, state_names):
# vectorize features
sentences_fvs = []
for i, sent in enumerate(X):
sentences_fvs.append(vectorizer.transform(sent).todense())
sys.stdout.write("\rVecotrizing, %d sentences left." % (len(X)-i-1))
sys.stdout.flush()
print()
model = HiddenMarkovModel.from_samples( distribution=BernoulliDistribution,
n_components=2,
X=sentences_fvs,
labels=y,
state_names=state_names,
verbose=True,
max_iterations=200)
return model
def init_HMM(vectorizer_vocabulary, state_names):
model = HiddenMarkovModel("HMM")
d1 = []
d2 = []
for _, v in enumerate(vectorizer_vocabulary):
p = random.uniform(0, 1)
d1.append(BernoulliDistribution(p))
d2.append(BernoulliDistribution(1-p))
s1 = State(IndependentComponentsDistribution(d1), name=state_names[0])
s2 = State(IndependentComponentsDistribution(d2), name=state_names[1])
model.add_states([s1, s2])
p = random.uniform(0, 1)
model.add_transition(model.start, s1, p)
model.add_transition(model.start, s2, 1-p)
p = np.random.dirichlet(np.ones(3),size=1)[0]
model.add_transition(s1, s1, p[0])
model.add_transition(s1, s2, p[1])
model.add_transition(s1, model.end, p[2])
p = np.random.dirichlet(np.ones(3),size=1)[0]
model.add_transition(s2, s1, p[0])
model.add_transition(s2, s2, p[1])
model.add_transition(s2, model.end, p[2])
model.bake(verbose=True)
return model
def chunks_2(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield (i,i+n)
def HMM_from_summaries(X, y, vectorizer, state_names):
print("\nInit HMM ...")
model = init_HMM(vectorizer.vocabulary_, state_names)
chunk_size = 500
print("Training on Chunks ...")
count = len(X)/chunk_size
print("Number of chunks is %d" % count)
for chunk_idx in chunks_2(X, chunk_size):
chunk=X[chunk_idx[0]:chunk_idx[1]]
transformed_chunk=[]
for i, sent in enumerate(chunk):
transformed_chunk.append(vectorizer.transform(sent).todense())
if y: y=y[chunk_idx[0]:chunk_idx[1]]
model.summarize(transformed_chunk, labels=y)
count-=1
sys.stdout.write("\rmodel.summarize. %d chunks left." % count)
sys.stdout.flush()
print("\nFitting the model on Summaries ...")
model.from_summaries()
return model
# Test Fitted Models ###########################################################
def replace_unknown(sequence): return [w if w in set(words) else 'nan' for w in sequence]
def simplify_decoding(X, model):
_, state_path = model.viterbi(X)
if state_path is None: return ["s0"]*len(X)
return [x for x in [state[1].name for state in state_path[1:]] if x!= "HMM-end"]
def pred(X, model):
d = {"LOC": "O", "O": "LOC"}
d2 = {"s0": "LOC", "s1": "O"}
preds = []
preds_inv = []
for sent in X:
pred = [x for x in simplify_decoding(sent, model)]
if "s0" in pred: pred=[d2[x] for x in pred]
preds.append(pred)
pred_inv = [d[x] for x in pred]
preds_inv.append(pred_inv)
return preds, preds_inv
def getTpFpn(test_tags, pred_tags):
tag_class = "LOC"
tp, fp, tn, fn = 0, 0, 0, 0
for s,sent in enumerate(test_tags):
for t, tag in enumerate(sent):
if test_tags[s][t] == pred_tags[s][t]:
if test_tags[s][t] == tag_class:
tp += 1
elif test_tags[s][t] == tag_class:
fn += 1
elif pred_tags[s][t] == tag_class:
fp += 1
return [tp, fp, fn]
def getPRF(TPFPN_score):
TpFpn = np.sum(TPFPN_score, axis=0)
P = TpFpn[0]/(TpFpn[0] + TpFpn[1])
R = TpFpn[0]/(TpFpn[0] + TpFpn[2])
F = 2*P*R/(P+R)
return P, R, F
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
def test_model(model, test_X, test_y, vectorizer, state_names):
chunk_size = 600
count = len(test_X)/chunk_size
start, end = 0, 0
TPFPN_score = []
TPFPN_score_inv = []
for chunk in chunks(test_X, chunk_size):
test_sentences_fvs=[]
for i, sent in enumerate(chunk):
test_sentences_fvs.append(vectorizer.transform(sent).todense())
end += len(test_sentences_fvs)
test_tags = test_y[start:end]
assert len(test_sentences_fvs) == len(test_tags)
predicted_tags, predicted_tags_inv = pred(test_sentences_fvs, model)
# report = metrics.flat_classification_report(test_tags,
# predicted_tags,
# labels=state_names,
# digits=3)
TPFPN_score.append(getTpFpn(test_tags, predicted_tags))
TPFPN_score_inv.append(getTpFpn(test_tags, predicted_tags_inv))
start = end
try:
print ("P, R, F: ", getPRF(TPFPN_score))
except:
print("Error in TPFPN_score")
try:
print ("P, R, F: ", getPRF(TPFPN_score_inv))
except:
print("Error in TPFPN_score_inv")
################################################################################
def serialize_model(model, model_path):
with open("models/"+model_path, "w") as f:
json.dump(model.to_json(), f)
def deserialize_model(model_path):
with open("models/"+model_path) as f:
model = HiddenMarkovModel.from_json(json.load(f))
return model
def serialize_vectorizer(vectorizer, path):
with open("models/"+path, "wb") as dill_file:
dill.dump(vectorizer, dill_file)
def deserialize_vectorizer(path):
with open(path, 'rb') as file:
return dill.load(file)
def get_partial_tags(train_y):
train_y_partial_tags=train_y.copy()
number_to_replace=int(0.9*len(train_y_partial_tags)) # 10% labeled only
for i in range(number_to_replace):
train_y_partial_tags[random.randrange(0,len(train_y_partial_tags))]=None
return train_y_partial_tags
################################################################################
################################################################################
if __name__=="__main__":
FROM_SUMMARIES=True
train_X, train_y, test_X, test_y, features_set = get_train_test_data("conll2003", identity_featurizer, None)
#train_X, train_y, test_X, test_y, features_set = get_train_test_data("conll2003", simple_Featurizer, None)
vectorizer = CountVectorizer(tokenizer=lambda doc: doc, lowercase=False).fit([features_set])
serialize_vectorizer(vectorizer, "conll2003_vectorizer")
states=["LOC", "O"]
print("#"*20)
print("Fitting Supservised HMM")
model = HMM_from_summaries(train_X, train_y, vectorizer, states)
# model = HMM_from_samples(train_X, train_y, vectorizer, states)
serialize_model(model, "conll2003_supervised")
# print("#"*20)
# print("Fitting Semi-supservised HMM")
# model = HMM_from_summaries(train_X, get_partial_tags(train_y), vectorizer, states)
# # model = HMM_from_samples(train_X, get_partial_tags(train_y), vectorizer, states)
# serialize_model(model, "conll2003_semisupervised")
#
# print("#"*20)
# print("Fitting Unsupservised HMM")
# model = HMM_from_summaries(train_X, None, vectorizer, states)
# # model = HMM_from_samples(train_X, None, vectorizer, states)
# serialize_model(model, "conll2003_unsupervised_from_samples")
############################################################################
# Test
print()
print("#"*50)
print()
print("#"*50)
print("Testing Supervised HMM")
print("#"*50)
test_model( deserialize_model("conll2003_supervised"),
test_X=test_X,
test_y=test_y,
vectorizer=vectorizer,
state_names=states)
# P, R, F: (0.043953963010499295, 0.998565965583174, 0.08420161631633044)
# P, R, F: (0.0008207934336525308, 0.0014340344168260039, 0.001044022968505307)
# print("#"*50)
# print("Testing Semi-supervised HMM")
# print("#"*50)
# test_model( model=deserialize_model("conll2003_semisupervised"),
# test_X=test_X,
# test_y=test_y,
# vectorizer=vectorizer,
# state_names=states)
# P, R, F: (0.040873744675862606, 1.0, 0.07853737282727033)
# print("#"*50)
# print("Testing Unsupervised HMM")
# print("#"*50)
# test_model( model=deserialize_model("conll2003_unsupervised"),
# test_X=test_X,
# test_y=test_y,
# vectorizer=vectorizer,
# state_names=states)
#
# P, R, F: (0.043953963010499295, 0.998565965583174, 0.08420161631633044)
# P, R, F: (0.0008207934336525308, 0.0014340344168260039, 0.001044022968505307)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment