Skip to content

Instantly share code, notes, and snippets.

@jbaiter
Created January 18, 2016 23:12
Show Gist options
  • Save jbaiter/78cf7682f0a51d9f0809 to your computer and use it in GitHub Desktop.
Save jbaiter/78cf7682f0a51d9f0809 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3.4
import argparse
import gzip
import json
import operator
import random
import sys
from collections import defaultdict
from itertools import chain
import util
def capitalization(word):
""" Obtain capitalization information, i.e. 'g' is word is capitalized,
'k' if it's not.
"""
if word[0].isupper():
return 'g'
elif word.islower():
return 'k'
else:
return '0'
def make_suffix(word, k):
""" Get the suffix of length k of the word, padding with spaces at the
front if len(word) < k
:param word: Word to get suffix from
:type word: str
:param k: Length of suffix
:type k: int
:returns: Suffix of length k
:rtype: str
"""
return (" "*k + word)[-k:]
def load_data(path):
""" Load dataset from tab-separated file structured into sentences. """
if path.endswith('.gz'):
open_fn = gzip.GzipFile
else:
open_fn = open
with open_fn(path) as fp:
raw = fp.read().decode('utf8')
return ([tuple(line.split('\t')) for line in sentence.split('\n')]
for sentence in raw.split('\n\n') if sentence)
def pad_data(data, margin=(2, 1), paddings=("<s>", "</s>")):
""" Pad given nested dataset. """
return ([paddings[0]]*margin[0] + s + [paddings[1]]*margin[1]
for s in data)
def random_infinite(it):
""" Make an infinite generator that yields items from `it` in random
order, making sure that items only get repeated once a full pass
through `it` has been done.
"""
# Materialize iterator if we don't have a list or tuple
if not isinstance(it, (list, tuple)):
it = tuple(it)
while True:
pool = sorted(it, key=lambda k: random.random())
for itm in pool:
yield itm
class PerceptronTagger:
""" Part-of-Speech tagger using the Perceptron algorithm for training
and the Viterbi algorithm for determining the most likely tag sequence.
"""
def __init__(self, max_suffix_len=5, viterbi_pick=5):
""" Initialize the tagger.
:param max_suffix_len: Maximum suffix length to take into account
for training
:type max_suffix_len: int
:param viterbi_pick: Number of most likely predecessor tags to
take into account for tagging.
:type viterbi_pick: int
"""
self._max_suffix_len = max_suffix_len
self._viterbi_pick = viterbi_pick
def train(self, train_data, test_data=None, max_iterations=1e5,
test_every=2.5e4):
""" Train the tagger.
:param train_data: Tagged sentences to use for training
:type train_data: list of lists of (word, tag) tuples
:param test_data: Tagged sentences to use for testing
:type test_data: list of lists of (word, tag) tuples
:param max_iterations: Number of training iterations
:type max_iterations: int
:param test_every: Run tests on partially trained model after
a given amount of iterations.
:type test_every: float
:returns: Overall accuracy and per-tag
accuracy/precision/recall/F1
:rtype: (float,
{tag: :py:class:`ClassificationResult`})
"""
train_data = list(
pad_data(train_data, margin=(1, 1),
paddings=(('<s>', '<s>'), ('</s>', '</s>'))))
if test_data:
test_data = list(
pad_data(test_data, margin=(1, 1),
paddings=(('<s>', '<s>'), ('</s>', '</s>'))))
self._all_tags = list(set(chain(*((tag for word, tag in sentence)
for sentence in train_data))))
self._weights = defaultdict(int)
sample_iter = util.with_progress(random_infinite(train_data),
max_iterations)
for num_iter, sample in enumerate(sample_iter):
if num_iter == max_iterations:
break
do_online_test = (test_data and test_every and num_iter and
not num_iter % test_every)
if do_online_test:
print("\nTesting accuracy on test set...", file=sys.stderr)
print("Accuracy after {} iterations: {}".format(
num_iter, util.color_score(self.score(test_data)[0],
digits=2)),
file=sys.stderr)
words, actual_tags = zip(*sample)
predicted_tags = self.tag(words, True)
if predicted_tags == actual_tags:
continue
feature_predicted = self._make_feature(
tuple(zip(words, predicted_tags)))
feature_actual = self._make_feature(tuple(sample))
for feat in feature_predicted:
self._weights[feat] -= 1
for feat in feature_actual:
self._weights[feat] += 1
# Prune unneccessary weights (i.e. those with value 0)
to_prune = [f for f, w in self._weights.items() if not w]
for feat in to_prune:
del self._weights[feat]
print("\nPruned {} weights, {} remaining."
.format(len(to_prune), len(self._weights)), file=sys.stderr)
def tag(self, sentence, keep_padding=False):
""" Tag a tokenized sentence.
:param sentence: Tokenized sentence
:type sentence: list of str
:param keep_padding: Don't strip start/end-of-sentence tags frorm
output
:type keep_padding: bool
:returns: Tag sequence for the sentence
:rtype: list of str
"""
if sentence[0] != '<s>':
sentence = ['<s>'] + list(sentence)
if sentence[-1] != '</s>':
sentence = list(sentence) + ['</s>']
viterbi_scores = [defaultdict(lambda: -float('inf')) for _ in sentence]
viterbi_scores[0]['<s>'] = 0
viterbi_path = {tag: [tag] for tag in self._all_tags}
for idx, word in enumerate(sentence[1:], 1):
new_path = {}
for tag in self._all_tags:
all_scores = dict()
best_iter = sorted(viterbi_scores[idx-1].items(),
key=operator.itemgetter(1), reverse=True)
for cand_tag, prev_score in best_iter[:self._viterbi_pick]:
feature_vec = self._make_feature(
(word, tag, cand_tag))
w_sum = sum(self._weights.get(f, 0) for f in feature_vec)
all_scores[cand_tag] = prev_score + w_sum
best_tag, best_score = max(all_scores.items(),
key=operator.itemgetter(1))
viterbi_scores[idx][tag] = best_score
new_path[tag] = viterbi_path[best_tag] + [tag]
viterbi_path = new_path
best_state = max(viterbi_scores[-1].items(),
key=operator.itemgetter(1))[0]
if keep_padding:
return viterbi_path[best_state]
else:
return viterbi_path[best_state][1:-1]
@classmethod
def from_dict(cls, data):
""" Construct a new :py:class:`PerceptronTagger` instance from the data
in the passed dictionary (which was e.g. deserialized from JSON).
:param data: Data to construct new instance from
:type data: dict as returned by :py:meth:`to_dict`
:returns: A new instance
:rtype: :py:class:`PerceptronTagger`
"""
t = cls(data['max_suffix_len'], data['viterbi_pick'])
t._all_tags = data['all_tags']
t._weights = defaultdict(int, util.tupledict_from_json(data['weights']))
return t
def to_dict(self):
""" Serialize the tagger to a single dictionary.
:returns: The tagger state as a dictionary
:rtype: dict
"""
return {
'max_suffix_len': self._max_suffix_len,
'viterbi_pick': self._viterbi_pick,
'all_tags': self._all_tags,
'weights': util.make_dict_json_compatible(self._weights)}
def score(self, test_data):
""" Calculate performance on test data.
:param test_data: Tagged sentences to run test on
:type test_data: list of lists of (word, tag) pairs
:returns: Overall accuracy and per-tag
accuracy/precision/recall/F1
:rtype: (float, {tag: :py:class:`ClassificationResult`})
"""
is_padded = (test_data[0][0][0] == '<s>' and
test_data[0][-1][0] == '</s>')
gold = []
observed = []
for sent in util.with_progress(test_data):
words, tags = zip(*sent)
gold.append(tags)
observed.append(self.tag(words, is_padded))
return util.score_classification(
list(chain(*gold)), list(chain(*observed)))
def _make_feature(self, data):
""" Create a feature vector for the given data.
Data can be:
- a list of (word, tag) tuples (= whole sentence), where the
first word is None (for the <s> tag)
- a single (word, tag, prev_tag) tuple (= single word)
"""
if not isinstance(data[0], tuple) and len(data) == 3:
word, tag, prev_tag = data
tag_bigrams = [(prev_tag, tag)]
words = [(tag, word.lower())]
lexical = [(tag, capitalization(word), make_suffix(word.lower(), n))
for n in range(1, self._max_suffix_len)]
else:
tag_bigrams = list(zip(*[(tag for word, tag in data[k:])
for k in range(2)]))
words = [(tag, word.lower()) for word, tag in data[1:]]
lexical = list(chain(
*(((tag, capitalization(word), make_suffix(word.lower(), n))
for word, tag in data[1:])
for n in range(1, self._max_suffix_len))))
return tuple(chain(tag_bigrams, words, lexical))
def train_cli(args):
""" Command-line interface for training. """
tagger = PerceptronTagger(args.max_suffix_length, args.viterbi_pick)
all_data = list(load_data(args.corpus))
if args.shuffle:
random.shuffle(all_data)
print("Calculating probabilities on training data from {}..."
.format(args.corpus), file=sys.stderr)
if args.no_test:
train_data = all_data
test_data = None
else:
train_data, test_data = util.split_dataset(
all_data, (100-100*args.test_size, 100*args.test_size))
tagger.train(train_data, test_data, args.num_iterations,
args.test_every or None)
if args.output:
output = args.output
if not output.endswith(".gz"):
output += ".gz"
print("Writing gzip-compressed parameters to {}..."
.format(output), file=sys.stderr)
with gzip.open(output, 'wt') as fp:
json.dump(tagger.to_dict(), fp, indent=2)
else:
print("Writing parameters to standard output...", file=sys.stderr)
fp = sys.stdout
json.dump(tagger.to_dict(), sys.stdout, indent=2)
if test_data:
print("Running final test...", file=sys.stderr)
accuracy, scores = tagger.score(test_data)
print("Final accuracy: {:.2f}".format(accuracy), file=sys.stderr)
if args.verbose:
util.print_scores(scores)
def tag_cli(args):
if args.params.endswith(".gz"):
open_fn = gzip.open
else:
open_fn = open
with open_fn(args.params, 'rt') as fp:
tagger = PerceptronTagger.from_dict(json.load(fp))
if args.sentence:
sentences = [s.split() for s in args.sentence]
else:
lines = sys.stdin.readlines()
if len(lines[0].split()) == 1:
# One token per line
sentences = []
cur_sent = []
for line in lines:
word = line.strip()
if word:
cur_sent.append(word)
else:
sentences.append(cur_sent)
cur_sent = []
else:
# All tokens in one line
sentences = [l.split() for l in lines]
all_tags = map(tagger.tag, sentences)
for sent, tags in zip(sentences, all_tags):
for word, tag in zip(sent, tags):
print("{}\t{}".format(word, tag))
print("\n", end="")
def get_argument_parser():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
train_parser = subparsers.add_parser("train", help="Train the tagger.")
train_parser.add_argument(
'corpus', help="Path to tagged corpus to train with")
train_parser.add_argument(
'-n', '--num-iterations', type=int, default=int(1e5),
help="Number of iterations to train for. Should be at least as high "
"as the total number of training samples available.")
train_parser.add_argument(
'-p', '--viterbi-pick', type=int, default=5, metavar='n',
help="Take only the n highest-scoring predecessors into account "
"when searching for the most likely tag sequence.")
train_parser.add_argument(
'-l', '--max-suffix-length', type=int, default=5,
help="Maximum length of word suffixes to take into account")
train_parser.add_argument(
'-o', '--output', help='Path to output file, defaults to stdout')
train_parser.add_argument(
'-t', '--test-size', type=float, default=0.2,
help='Fraction of the data to use for testing')
train_parser.add_argument(
'-s', '--shuffle', action='store_true',
help="Shuffle dataset before splitting into training and test "
"datasets.")
train_parser.add_argument(
'-v', '--verbose', action='store_true',
help="Display detailed scores for each tag")
train_parser.add_argument(
'--test-every', type=int,
help='Determine accuracy on test set after every nth iteration. '
'Useful for tracking progress during training')
train_parser.add_argument(
'--no-test', action='store_true',
help="Use all of the data for training, don't test performance of "
"tagger on held-out data.")
train_parser.set_defaults(func=train_cli)
tag_parser = subparsers.add_parser("tag", help="Tag sentences")
tag_parser.add_argument(
'-p', '--params', type=str, required=True,
help="Path to trained parameter file")
tag_parser.add_argument(
'sentence', nargs='*', type=str,
help="Sentences to tag. Must be separated by newlines if passed via "
"stdin.")
tag_parser.set_defaults(func=tag_cli)
return parser
if __name__ == '__main__':
parser = get_argument_parser()
args = parser.parse_args()
args.func(args)
import itertools as it
import math
import sys
import time
from collections import Counter, defaultdict, namedtuple
# Datatype that stores evaluation results
EvaluationResult = namedtuple("EvaluationResult", ["accuracy", "precision",
"recall", "f1"])
def make_dict_json_compatible(dct):
""" Make sure that a given dictionary can be properly serializes to JSON.
Basically this converts dicts with non-scalar keys to nested lists.
"""
for k, v in dct.items():
if isinstance(v, dict):
dct[k] = make_dict_json_compatible(v)
if any(isinstance(k, (tuple, list)) for k in dct.keys()):
return list(dct.items())
else:
return dct
def tupledict_from_json(dct):
""" Convert a sequence of (key, value) pairs, where the key is a list,
to a dictionary with a tuple key.
This is neccessary because Python dict keys must be hashable, i.e. we
need an immutable data structure instead of a list.
:param dct: The sequence to convert to a dictionary
:type dct: Sequence of (list, sometype) pairs
:returns: The sequence as a dictionary with tuple keys
:rtype: tuple -> sometype dict
"""
return {tuple(key): value for key, value in dct}
def split_dataset(data, fractions):
""" Split a dataset into a number of chunks.
The chunk size is determined by the values in `fractions`, which is a
tuple of integers that denotes the relative sizes of the chunks.
E.g. fractions=(8, 2) will result in two chunks where the first one
has 80% of the data and the second one the remaining 20%.
:param data: The dataset to be split
:type data: A sequence
:param fractions: Relative sizes of the chunks
:type fractions: tuple of int
:returns: tuple of chunks
"""
total_size = len(data)
part_size = total_size / sum(fractions)
indices = []
last_end = 0
for frac in fractions:
start = last_end
last_end = start + math.ceil(frac*part_size)
indices.append((start, last_end))
return tuple(data[start:end] for start, end in indices)
def score_classification(expected, observed):
""" Calculate scores for the given classification result.
:param expected: Expected categories/'gold standard'
:type expected: sequence of discrete values
:param observed: Observed/predicted categories/classification result
:type observed: sequence of discrete values
:returns: Accuracy of whole dataset and scores for each
individual category
:rtype: tuple of (float, category ->
:py:class:`EvaluationResult` dict)
"""
category_counts = Counter(expected)
true_pos = defaultdict(int)
false_pos = defaultdict(int)
for exp, obs in zip(expected, observed):
if exp == obs:
true_pos[exp] += 1
else:
false_pos[obs] += 1
category_scores = {cat: calculate_scores(fq, true_pos[cat], false_pos[cat])
for cat, fq in category_counts.items()}
accuracy = sum(true_pos.values()) / len(expected)
return accuracy, category_scores
def calculate_scores(freq, true_pos, false_pos):
""" Calculate accuracy, precision, recall and F1 for the given evaluation
result.
:param freq: Total frequency of the category
:type freq: int
:param true_pos: Number of true positive categorizations
:type true_pos: int
:param false_pos: Number of false positive categorizations
:type false_pos: int
:returns: Accuracy, precision, recall and F1
:rtype: :py:class:`EvaluationResult`, a named tuple
"""
false_neg = freq - true_pos
accuracy = true_pos/freq
try:
precision = true_pos/(true_pos + false_pos)
except ZeroDivisionError:
precision = 0
recall = true_pos/(true_pos + false_neg)
try:
f1 = 2 * (precision * recall) / (precision + recall)
except ZeroDivisionError:
f1 = 0
return EvaluationResult(accuracy, precision, recall, f1)
def color_score(score, digits=3, good=0.95, bad=0.75):
""" Format a score into a colored string.
:param score: The score to format
:type score: float
:param digits: The decimal places of the score to place
:type digits: int
:param good: Threshold above which a score is considered good
:type good: float
:param bad: Threshold below which a score is considered bad
:type bad: float
:returns: The formatted string
:rtype: str
"""
# Only colorize if we're writing to a TTY
if sys.stderr.isatty():
reset_col = '\033[39m'
if score >= good:
color = '\033[32m' # green
elif score <= bad:
color = '\033[31m' # red
else:
color = reset_col # no color
else:
color = reset_col = ""
tmpl = "{}{:.%df}{}" % digits
return tmpl.format(color, score, reset_col)
def print_scores(category_scores):
""" Print a table of per-category scores.
:param category_scores: The scores to print
:type category_scores: category -> :py:class:`EvaluationResult` dict
"""
row_tmpl = "{:>9} || {:^10}| {:^10}| {:^10}| {:^10}"
header_tmpl = row_tmpl
if sys.stderr.isatty():
# ANSI Color codes are counted during whitespace calculation, but not
# printed, so we add 10 (= color + reset) to the columns with
# color-coded values
row_tmpl = row_tmpl.replace("10", "20")
print("\nPer-Category Scores:", file=sys.stderr)
print(header_tmpl.format("Category", "Accuracy", "Precision", "Recall",
"F1"), file=sys.stderr)
print(header_tmpl.format(*it.repeat("=", 14))
.replace(' ', '=').replace('|', '+'), file=sys.stderr)
for cat, score in sorted(category_scores.items(), key=lambda x: x[0]):
print(row_tmpl.format(cat, *map(color_score, score)),
file=sys.stderr)
def with_progress(it, num_expected=None):
if num_expected is None:
num_expected = len(it)
start = time.time()
for idx, x in enumerate(it):
yield x
it_per_sec = idx/(time.time()-start)
if not it_per_sec:
continue
eta_s = (num_expected-idx)//it_per_sec
eta_m = eta_s//60
eta_s = eta_s%60
eta_h = eta_m//60
eta_m = eta_m%60
eta = "{:02.0f}s".format(eta_s)
if eta_m:
eta = "{:02.0f}m".format(eta_m) + eta
if eta_h:
eta = "{:.0f}h".format(eta_h) + eta
print("\r{}/{} ({:.2f}%, {:.2f}/s) eta: {} ".format(
idx, int(num_expected), (idx/num_expected)*100, it_per_sec, eta),
end='', file=sys.stderr)
print("", file=sys.stderr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment