Skip to content

Instantly share code, notes, and snippets.

@studiawan
Last active August 1, 2018 02:34
Show Gist options
  • Save studiawan/f3af6d864c75213c98e2d74e555e608e to your computer and use it in GitHub Desktop.
Save studiawan/f3af6d864c75213c98e2d74e555e608e to your computer and use it in GitHub Desktop.
NER with scikit-learn
# https://nlpforhackers.io/training-ner-large-dataset/
import os
from nltk import conlltags2tree
import re
from nltk.stem.snowball import SnowballStemmer
import itertools
from nltk import tree2conlltags
from nltk.chunk import ChunkParserI
from sklearn.linear_model import Perceptron, SGDClassifier
from sklearn.naive_bayes import MultinomialNB
from sklearn.feature_extraction import DictVectorizer
from sklearn.pipeline import Pipeline
import pickle
def shape(word):
word_shape = 'other'
if re.match('[0-9]+(\.[0-9]*)?|[0-9]*\.[0-9]+$', word):
word_shape = 'number'
elif re.match('\W+$', word):
word_shape = 'punct'
elif re.match('[A-Z][a-z]+$', word):
word_shape = 'capitalized'
elif re.match('[A-Z]+$', word):
word_shape = 'uppercase'
elif re.match('[a-z]+$', word):
word_shape = 'lowercase'
elif re.match('[A-Z][a-z]+[A-Z][a-z]+[A-Za-z]*$', word):
word_shape = 'camelcase'
elif re.match('[A-Za-z]+$', word):
word_shape = 'mixedcase'
elif re.match('__.+__$', word):
word_shape = 'wildcard'
elif re.match('[A-Za-z0-9]+\.$', word):
word_shape = 'ending-dot'
elif re.match('[A-Za-z0-9]+\.[A-Za-z0-9\.]+\.$', word):
word_shape = 'abbreviation'
elif re.match('[A-Za-z0-9]+\-[A-Za-z0-9\-]+.*$', word):
word_shape = 'contains-hyphen'
return word_shape
def ner_features(tokens, index, history):
"""
`tokens` = a POS-tagged sentence [(w1, t1), ...]
`index` = the index of the token we want to extract features for
`history` = the previous predicted IOB tags
"""
# Pad the sequence with placeholders
tokens = [('__START2__', '__START2__'), ('__START1__', '__START1__')] + list(tokens) + [('__END1__', '__END1__'), ('__END2__', '__END2__')]
# print tokens
history = ['__START2__', '__START1__'] + list(history)
# shift the index with 2, to accommodate the padding
index += 2
word, pos = tokens[index]
prevword, prevpos = tokens[index - 1]
prevprevword, prevprevpos = tokens[index - 2]
nextword, nextpos = tokens[index + 1]
nextnextword, nextnextpos = tokens[index + 2]
previob = history[-1]
prevpreviob = history[-2]
feat_dict = {
'word': word,
'lemma': stemmer.stem(word),
'pos': pos,
'shape': shape(word),
'next-word': nextword,
'next-pos': nextpos,
'next-lemma': stemmer.stem(nextword),
'next-shape': shape(nextword),
'next-next-word': nextnextword,
'next-next-pos': nextnextpos,
'next-next-lemma': stemmer.stem(nextnextword),
'next-next-shape': shape(nextnextword),
'prev-word': prevword,
'prev-pos': prevpos,
'prev-lemma': stemmer.stem(prevword),
'prev-iob': previob,
'prev-shape': shape(prevword),
'prev-prev-word': prevprevword,
'prev-prev-pos': prevprevpos,
'prev-prev-lemma': stemmer.stem(prevprevword),
'prev-prev-iob': prevpreviob,
'prev-prev-shape': shape(prevprevword),
}
return feat_dict
def read_conll_pos(filename):
with open(filename, 'r') as f:
for line in f:
line_split = line.split()
if line_split:
word, tag, ner = line_split[0], line_split[1], line_split[2]
try:
sentence.append((word, tag, ner))
except NameError:
sentence = []
sentence.append((word, tag, ner))
if line_split == []:
tree = conlltags2tree(sentence)
sentence = []
if tree:
yield tree
class ScikitLearnChunker(ChunkParserI):
@classmethod
def to_dataset(cls, parsed_sentences, feature_detector):
"""
Transform a list of tagged sentences into a scikit-learn compatible POS dataset
:param parsed_sentences:
:param feature_detector:
:return:
"""
X, y = [], []
for parsed in parsed_sentences:
iob_tagged = tree2conlltags(parsed)
words, tags, iob_tags = zip(*iob_tagged)
tagged = zip(words, tags)
for index in range(len(iob_tagged)):
X.append(feature_detector(tagged, index, history=iob_tags[:index]))
y.append(iob_tags[index])
return X, y
@classmethod
def get_minibatch(cls, parsed_sentences, feature_detector, batch_size=500):
batch = list(itertools.islice(parsed_sentences, batch_size))
X, y = cls.to_dataset(batch, feature_detector)
return X, y
@classmethod
def train(cls, parsed_sentences, feature_detector, all_classes, **kwargs):
X, y = cls.get_minibatch(parsed_sentences, feature_detector, kwargs.get('batch_size', 500))
vectorizer = DictVectorizer(sparse=False)
vectorizer.fit(X)
clf = Perceptron(verbose=10, n_jobs=-1, n_iter=kwargs.get('n_iter', 5))
# clf = SGDClassifier(verbose=10, n_jobs=-1, n_iter=kwargs.get('n_iter', 5))
# clf = MultinomialNB()
while len(X):
X = vectorizer.transform(X)
clf.partial_fit(X, y, all_classes)
X, y = cls.get_minibatch(parsed_sentences, feature_detector, kwargs.get('batch_size', 500))
clf = Pipeline([
('vectorizer', vectorizer),
('classifier', clf)
])
return cls(clf, feature_detector)
def __init__(self, classifier, feature_detector):
self._classifier = classifier
self._feature_detector = feature_detector
def parse(self, tokens):
"""
Chunk a tagged sentence
:param tokens: List of words [(w1, t1), (w2, t2), ...]
:return: chunked sentence: nltk.Tree
"""
history = []
iob_tagged_tokens = []
for index, (word, tag) in enumerate(tokens):
iob_tag = self._classifier.predict([self._feature_detector(tokens, index, history)])[0]
history.append(iob_tag)
iob_tagged_tokens.append((word, tag, iob_tag))
return conlltags2tree(iob_tagged_tokens)
def score(self, parsed_sentences):
"""
Compute the accuracy of the tagger for a list of test sentences
:param parsed_sentences: List of parsed sentences: nltk.Tree
:return: float 0.0 - 1.0
"""
X_test, y_test = self.__class__.to_dataset(parsed_sentences, self._feature_detector)
return self._classifier.score(X_test, y_test)
stemmer = SnowballStemmer('english')
# train and dev become train data
reader = read_conll_pos('/home/hudan/Git/prlogparser/data/conll-pos/conll.pos.train.dev.txt')
reader_test = read_conll_pos('/home/hudan/Git/prlogparser/data/conll-pos/conll.pos.test.txt')
all_classes = ['I-TIM', 'B-TIM', 'I-SEQ', 'B-SEQ' 'I-LEV', 'B-LEV' 'I-HOS', 'B-HOS', 'I-SER', 'B-SER', 'B-SUB', 'I-SUB', 'B-UTIM','I-UTIM', 'O', 'B-SOC', 'I-SOC', 'B-NUM', 'I-NUM', 'I-COR', 'B-COR', 'B-SOU', 'I-SOU', 'B-ARC', 'I-ARC', 'I-DOM', 'B-DOM', 'I-STA', 'B-STA', 'B-IPA', 'I-IPA', 'I-DAS', 'B-DAS', 'B-AUT', 'I-AUT', 'B-COM', 'I-COM', 'B-STC', 'I-STC', 'B-BYT', 'I-BYT', 'I-REF', 'B-REF', 'I-CLI', 'B-CLI', 'I-JOB', 'B-JOB']
pa_ner = ScikitLearnChunker.train(itertools.islice(reader, 50000), feature_detector=ner_features, all_classes=all_classes, batch_size=500, n_iter=5)
print('training performance', pa_ner.evaluate(itertools.islice(reader, 5000)))
print('testing performance', pa_ner.evaluate(itertools.islice(reader_test, 5000)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment