Created
June 20, 2017 15:08
-
-
Save Orbifold/cac909337d7d935c8e86ebc48b9ed912 to your computer and use it in GitHub Desktop.
NER out-of-memory trainer for Dutch.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# http://nlpforhackers.io/training-ner-large-dataset/ | |
# http://gmb.let.rug.nl/data.php | |
import os | |
from nltk import conlltags2tree | |
def to_conll_iob(annotated_sentence): | |
""" | |
`annotated_sentence` = list of triplets [(w1, t1, iob1), ...] | |
Transform a pseudo-IOB notation: O, PERSON, PERSON, O, O, LOCATION, O | |
to proper IOB notation: O, B-PERSON, I-PERSON, O, O, B-LOCATION, O | |
""" | |
proper_iob_tokens = [] | |
for idx, annotated_token in enumerate(annotated_sentence): | |
tag, word, ner = annotated_token | |
if ner != 'O': | |
if idx == 0: | |
ner = "B-" + ner | |
elif annotated_sentence[idx - 1][2] == ner: | |
ner = "I-" + ner | |
else: | |
ner = "B-" + ner | |
proper_iob_tokens.append((tag, word, ner)) | |
return proper_iob_tokens | |
def read_gmb_ner(corpus_root): | |
for root, dirs, files in os.walk(corpus_root): | |
for filename in files: | |
if filename.endswith(".tags"): | |
with open(os.path.join(root, filename), 'rb') as file_handle: | |
file_content = file_handle.read().decode('utf-8').strip() | |
annotated_sentences = file_content.split('\n\n') | |
for annotated_sentence in annotated_sentences: | |
annotated_tokens = [seq for seq in annotated_sentence.split('\n') if seq] | |
standard_form_tokens = [] | |
for idx, annotated_token in enumerate(annotated_tokens): | |
annotations = annotated_token.split('\t') | |
word, tag, ner = annotations[0], annotations[1], annotations[3] | |
if ner != 'O': | |
ner = ner.split('-')[0] | |
standard_form_tokens.append((word, tag, ner)) | |
conll_tokens = to_conll_iob(standard_form_tokens) | |
yield conlltags2tree(conll_tokens) | |
import re | |
def shape(word): | |
word_shape = 'other' | |
if re.match('[0-9]+(\.[0-9]*)?|[0-9]*\.[0-9]+$', word): | |
word_shape = 'number' | |
elif re.match('\W+$', word): | |
word_shape = 'punct' | |
elif re.match('[A-Z][a-z]+$', word): | |
word_shape = 'capitalized' | |
elif re.match('[A-Z]+$', word): | |
word_shape = 'uppercase' | |
elif re.match('[a-z]+$', word): | |
word_shape = 'lowercase' | |
elif re.match('[A-Z][a-z]+[A-Z][a-z]+[A-Za-z]*$', word): | |
word_shape = 'camelcase' | |
elif re.match('[A-Za-z]+$', word): | |
word_shape = 'mixedcase' | |
elif re.match('__.+__$', word): | |
word_shape = 'wildcard' | |
elif re.match('[A-Za-z0-9]+\.$', word): | |
word_shape = 'ending-dot' | |
elif re.match('[A-Za-z0-9]+\.[A-Za-z0-9\.]+\.$', word): | |
word_shape = 'abbreviation' | |
elif re.match('[A-Za-z0-9]+\-[A-Za-z0-9\-]+.*$', word): | |
word_shape = 'contains-hyphen' | |
return word_shape | |
from nltk.stem.snowball import SnowballStemmer | |
stemmer = SnowballStemmer('english') | |
def ner_features(tuples, index, history): | |
""" | |
`tokens` = a POS-tagged sentence [(w1, t1), ...] | |
`index` = the index of the token we want to extract features for | |
`history` = the previous predicted IOB tags | |
""" | |
# Pad the sequence with placeholders | |
tokens = [('__START2__', '__START2__'), ('__START1__', '__START1__')] + tuples + [('__END1__', '__END1__'), ('__END2__', '__END2__')] | |
history = ['__START2__', '__START1__'] + list(history) | |
# shift the index with 2, to accommodate the padding | |
index += 2 | |
word, pos = tokens[index] | |
prevword, prevpos = tokens[index - 1] | |
prevprevword, prevprevpos = tokens[index - 2] | |
nextword, nextpos = tokens[index + 1] | |
nextnextword, nextnextpos = tokens[index + 2] | |
previob = history[-1] | |
prevpreviob = history[-2] | |
feat_dict = { | |
'word': word, | |
'lemma': stemmer.stem(word), | |
'pos': pos, | |
'shape': shape(word), | |
'next-word': nextword, | |
'next-pos': nextpos, | |
'next-lemma': stemmer.stem(nextword), | |
'next-shape': shape(nextword), | |
'next-next-word': nextnextword, | |
'next-next-pos': nextnextpos, | |
'next-next-lemma': stemmer.stem(nextnextword), | |
'next-next-shape': shape(nextnextword), | |
'prev-word': prevword, | |
'prev-pos': prevpos, | |
'prev-lemma': stemmer.stem(prevword), | |
'prev-iob': previob, | |
'prev-shape': shape(prevword), | |
'prev-prev-word': prevprevword, | |
'prev-prev-pos': prevprevpos, | |
'prev-prev-lemma': stemmer.stem(prevprevword), | |
'prev-prev-iob': prevpreviob, | |
'prev-prev-shape': shape(prevprevword), | |
} | |
return feat_dict | |
import itertools | |
from nltk import tree2conlltags | |
from nltk.chunk import ChunkParserI | |
from sklearn.linear_model import Perceptron | |
from sklearn.feature_extraction import DictVectorizer | |
from sklearn.pipeline import Pipeline | |
class ScikitLearnChunker(ChunkParserI): | |
@classmethod | |
def to_dataset(cls, parsed_sentences, feature_detector): | |
""" | |
Transform a list of tagged sentences into a scikit-learn compatible POS dataset | |
:param parsed_sentences: | |
:param feature_detector: | |
:return: | |
""" | |
X, y = [], [] | |
for parsed in parsed_sentences: | |
iob_tagged = tree2conlltags(parsed) | |
words, tags, iob_tags = zip(*iob_tagged) | |
for index in range(len(iob_tagged)): | |
X.append(feature_detector(list(zip(words, tags)), index, history=iob_tags[:index])) | |
y.append(iob_tags[index]) | |
return X, y | |
@classmethod | |
def get_minibatch(cls, parsed_sentences, feature_detector, batch_size=500): | |
batch = list(itertools.islice(parsed_sentences, batch_size)) | |
X, y = cls.to_dataset(batch, feature_detector) | |
return X, y | |
@classmethod | |
def train(cls, parsed_sentences, feature_detector, all_classes, **kwargs): | |
X, y = cls.get_minibatch(parsed_sentences, feature_detector, kwargs.get('batch_size', 500)) | |
vectorizer = DictVectorizer(sparse=False) | |
vectorizer.fit(X) | |
clf = Perceptron(verbose=10, n_jobs=-1, n_iter=kwargs.get('n_iter', 5)) | |
while len(X): | |
X = vectorizer.transform(X) | |
clf.partial_fit(X, y, all_classes) | |
X, y = cls.get_minibatch(parsed_sentences, feature_detector, kwargs.get('batch_size', 500)) | |
clf = Pipeline([ | |
('vectorizer', vectorizer), | |
('classifier', clf) | |
]) | |
return cls(clf, feature_detector) | |
def __init__(self, classifier, feature_detector): | |
self._classifier = classifier | |
self._feature_detector = feature_detector | |
def parse(self, tokens): | |
""" | |
Chunk a tagged sentence | |
:param tokens: List of words [(w1, t1), (w2, t2), ...] | |
:return: chunked sentence: nltk.Tree | |
""" | |
history = [] | |
iob_tagged_tokens = [] | |
for index, (word, tag) in enumerate(tokens): | |
iob_tag = self._classifier.predict([self._feature_detector(tokens, index, history)])[0] | |
history.append(iob_tag) | |
iob_tagged_tokens.append((word, tag, iob_tag)) | |
return conlltags2tree(iob_tagged_tokens) | |
def score(self, parsed_sentences): | |
""" | |
Compute the accuracy of the tagger for a list of test sentences | |
:param parsed_sentences: List of parsed sentences: nltk.Tree | |
:return: float 0.0 - 1.0 | |
""" | |
X_test, y_test = self.__class__.to_dataset(parsed_sentences, self._feature_detector) | |
return self._classifier.score(X_test, y_test) | |
def train_perceptron(): | |
reader = read_gmb_ner("/Users/Swa/Desktop/AIML/Datasets/gmb-2.2.0") | |
all_classes = ['O', 'B-per', 'I-per', 'B-gpe', 'I-gpe', | |
'B-geo', 'I-geo', 'B-org', 'I-org', 'B-tim', 'I-tim', | |
'B-art', 'I-art', 'B-eve', 'I-eve', 'B-nat', 'I-nat'] | |
pa_ner = ScikitLearnChunker.train(itertools.islice(reader, 50000), feature_detector=ner_features, | |
all_classes=all_classes, batch_size=500, n_iter=5) | |
accuracy = pa_ner.score(itertools.islice(reader, 5000)) | |
import pickle | |
with open('ner.pkl', 'wb') as f: | |
pickle.dump(pa_ner, f) | |
print("Accuracy:", accuracy) | |
train_perceptron() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment