Skip to content

Instantly share code, notes, and snippets.

@jbaiter
Created January 11, 2016 21:29
Show Gist options
  • Save jbaiter/964e0b65b6e6e6c78758 to your computer and use it in GitHub Desktop.
Save jbaiter/964e0b65b6e6e6c78758 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3.4
import argparse
import math
import sys
from collections import defaultdict, namedtuple
from itertools import chain
from pcfg import read_pcfg
# Data type for constituents
# Implemented as a named tuple (=immutable!) so we can use it as a key
# in dictionaries
Constituent = namedtuple("Constituent", ["start", "end", "rule", "complete"])
def parse(lexicon, grammar, sentence):
""" Parse the tokenized sentence into a syntax tree.
:param lexicon: PCFG lexicon with word -> {tag -> probabity} mappings
:type lexicon: dict
:param grammar: PCFG grammar with head -> {tail -> probability}
mappings
:type grammar: dict
:return: The syntax tree for the sentence
:rtype: S-Expressions via nested lists
"""
# NOTE: All the state is kept in the lexical scope of this outer function,
# while the actual left-corner algorithm is implemented in the inner
# closures that have acces to this state.
chart = []
completed_endpos = defaultdict(list)
unknown_toks = [t for t in sentence if t not in lexicon]
if unknown_toks:
raise ValueError("Can't parse sentence because the following words "
"are unknown: {}".format(unknown_toks))
def prune_chart():
""" Remove non-complete constituents from the chart and the mapping
of completion end positions. """
nonlocal chart
nonlocal completed_endpos
pruned_chart = []
for ch in chart:
# Only keep completed constituents
pruned = {const: prob for const, prob in ch.items()
if const.complete == len(const.rule[1:])}
pruned_chart.append(pruned)
chart = pruned_chart
all_consts = set(chain(*chart))
completed_endpos = {
c: endpos for c, endpos in completed_endpos.items()
if c in all_consts}
def parse_subtree(start, end, sym=None):
""" Get the best parse for a given range of the sentence.
To obtain the best parse for the whole sentence, set start=0,
end=len(sentence) and sym=None.
This will determine the best starting symbol automatically.
"""
candidates = ((c, prob) for c, prob in chart[end].items()
if ((sym is None or c.rule[0] == sym) and
c.start == start and
c.end == end))
const = max(candidates, key=lambda x: x[1])[0]
if const.rule[1] in lexicon:
return const.rule
else:
end_positions = completed_endpos[const]
parse = [const.rule[0]]
cur_start = start
for child, end_pos in zip(const.rule[1:], end_positions):
child_parse = parse_subtree(cur_start, end_pos, child)
parse.append(child_parse)
cur_start = end_pos
return parse
def scan(word, position):
""" Look up a word in the lexicon, add matching nodes to chart. """
for tag, prob in lexicon[word].items():
const = Constituent(position, position+1, (tag, word), 1)
add(const, math.log(prob))
def predict(const):
""" Called when `const` is fully parsed, adds all rules from
grammar to chart where the first ('left-corner') constituent
of the rule is identical with `const`.
"""
for head, child_probs in grammar.items():
for tail, prob in child_probs.items():
if tail[0] == const.rule[0]:
child_rule = (head,) + tail
child_const = Constituent(const.start, const.start,
child_rule, 0)
add(child_const, math.log(prob))
def complete(const, const_prob):
""" Called when `const` is fully parsed, complete all partial
constituents from the chart that expect `const` as the next
constituent.
"""
# Since we can't mutate the chart during iteration, we remember which
# new constituents result from completion, so we can add them after
# we're done iterating.
completed = []
for other_const, prob in chart[const.start].items():
if other_const.complete == len(other_const.rule[1:]):
# Skip already completed constituents
continue
if other_const.rule[other_const.complete+1] != const.rule[0]:
# Skip constituents that don't expect our constituent
# as the next to be completed
continue
const_len = const.end - const.start
# Build the new, (partially) completed constituent
new_const = Constituent(
other_const.start, other_const.end + const_len,
other_const.rule, other_const.complete + 1)
# Probability for the new constituent is the product (sum, since
# we're using log-probabilities) of the completing and the
# completed constituent's probabilities.
completed.append((new_const, const_prob + prob))
completed_endpos[new_const] = (
completed_endpos[other_const] + [const.end])
for compl, prob in completed:
add(compl, prob)
def add(const, prob):
""" Add `const` to chart. If an equivalent constituent is present
in the chart, pick the one with the higher probability.
"""
cur_prob = chart[const.end].get(const, None)
if cur_prob is None or cur_prob < prob:
chart[const.end][const] = prob
# Was the constituent completely recognized?
if const.complete == len(const.rule[1:]):
predict(const)
complete(const, prob)
chart.append(dict())
for idx, tok in enumerate(sentence):
chart.append(dict())
scan(tok, idx)
prune_chart()
return parse_subtree(0, len(sentence))
class LeftCornerParser:
""" Parser using the Left-Corner algorithm. """
def __init__(self, lexicon_path, grammar_path):
""" Build a new parser from the given PCFG.
:param lexicon_path: Path to PCFG lexicon
:param grammar_path: Path to PCFG grammar
"""
self._grammar = read_pcfg(grammar_path)
self._lexicon = self._read_lexicon(lexicon_path)
def _read_lexicon(self, path):
""" Read the lexicon from the given PCFG file.
:param path: Path to PCFG lexicon
:returns: Lexicon as a {word: {tag: probability}} mapping
:rtype: {str: {str: float}} dict
"""
lex_pcfg = read_pcfg(path)
lex = {}
for tag, word_probs in lex_pcfg.items():
for word, prob in word_probs.items():
word = word[0]
if word in lex:
lex[word][tag] = prob
else:
lex[word] = {tag: prob}
return lex
def parse(self, sentence):
""" Parse the given (tokenized!) sentence.
:param sentence: The sentence to parse
:type sentence: sequence of str
:returns: The parse for the sentence
:rtype: Nested lists of str
"""
return parse(self._lexicon, self._grammar, sentence)
def print_tree(tree, indent=0):
""" Print the given tree to standard output.
:param tree: Tree
:type tree: nested lists of str
"""
head, *tail = tree
if len(tree) == 2 and not isinstance(tail[0], (tuple, list)):
print(indent*" ", head, tail[0])
return
else:
print(indent*" ", head)
for t in tail:
print_tree(t, indent+2)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-g", "--grammar", type=str, required=True,
help="Path to PCFG grammar.")
parser.add_argument("-l", "--lexicon", type=str, required=True,
help="Path to PCFG lexicon.")
parser.add_argument("sentence", type=str,
help="Tokenized sentence to parse.")
args = parser.parse_args()
parser = LeftCornerParser(args.lexicon, args.grammar)
print("Parsing sentence, this may take a while...", file=sys.stderr)
sent_parse = parser.parse(args.sentence.split())
print_tree(sent_parse)
#!/usr/bin/env python3.4
import re
import sys
from collections import defaultdict, deque, Counter
def build_parsetree(token_stream):
""" Build a lisp-style tree from the token stream. """
tree = []
while token_stream:
token = token_stream.popleft()
if token == "(":
tree.append(build_parsetree(token_stream))
elif token == ")":
return tree
else:
token = re.sub(r"^([A-Z-]+)[-=]\d+$", r"\1", token)
tree.append(token)
return tree
def parse_treebank(path):
""" Given a file with parse trees, tokenize it and parse it into a Python
data structure.
"""
with open(path) as fp:
data = fp.read()
# Normalize whitespace
data = re.sub(r"\s+", " ", data)
# Very simple regex lexer
data = re.sub("\(\(", "( (", data)
data = re.sub("\)\)", ") )", data)
data = re.sub("\((\S)", "( \\1", data)
data = re.sub("(\S)\)", "\\1 )", data)
return build_parsetree(deque(data.split()))
def count_contexts(tree):
""" Given a tree, count the grammatical and lexical contexts. """
grammar_counts = Counter()
lexicon_counts = Counter()
if len(tree) == 2 and not isinstance(tree[1], list):
lexicon_counts[tuple(tree)] += 1
return grammar_counts, lexicon_counts
if not isinstance(tree[0], list):
gram_pat = tuple([tree[0]] + [t[0] for t in tree[1:]])
grammar_counts[gram_pat] += 1
tree = tree[1:]
for node in tree:
new_gram, new_lex = count_contexts(node)
grammar_counts.update(new_gram)
lexicon_counts.update(new_lex)
return grammar_counts, lexicon_counts
def create_pcfg(treebank_paths):
""" Create a PCFG grammar and lexicon from a set of files containing
parsed sentences. """
# FIXME: The variable names could be clearer...
grammar_counts = Counter()
lexicon_counts = Counter()
for path in treebank_paths:
tree = parse_treebank(path)
gram_cnt, lex_cnt = count_contexts(tree)
grammar_counts.update(gram_cnt)
lexicon_counts.update(lex_cnt)
# Determine child probabilities for each root
gram_counts = defaultdict(dict)
for pat, cnt in grammar_counts.items():
root = pat[0]
children = pat[1:]
gram_counts[root][children] = cnt
gram_probs = defaultdict(dict)
for root, child_cnts in gram_counts.items():
all_cnt = sum(child_cnts.values())
for child, cnt in child_cnts.items():
gram_probs[root][child] = cnt / all_cnt
# Determine word probabilities for each tag
lex_counts = defaultdict(dict)
for (tag, word), cnt in lexicon_counts.items():
lex_counts[tag][word] = cnt
lex_probs = defaultdict(dict)
for tag, word_cnts in lex_counts.items():
all_cnt = sum(word_cnts.values())
for word, cnt in word_cnts.items():
lex_probs[tag][word] = cnt/all_cnt
return gram_probs, lex_probs
def write_pcfg(pcfg, path):
""" Write a given PCFG to a file. """
with open(path, 'w') as fp:
for root, children_probs in pcfg.items():
for children, children_prob in children_probs.items():
if isinstance(children, (list, tuple)):
children = " ".join(children)
fp.write("{prob} {root} {children}\n".format(
prob=children_prob, root=root, children=children))
def read_pcfg(path):
""" Read a PCFG from a file. """
pcfg = dict()
with open(path) as fp:
for line in fp:
prob, head, *children = line.split()
children = tuple(children)
prob = float(prob)
if head in pcfg:
pcfg[head][children] = prob
else:
pcfg[head] = {children: prob}
return pcfg
if __name__ == '__main__':
gram_probs, lex_probs = create_pcfg(sys.argv[1:])
write_pcfg(gram_probs, "grammar.pcfg")
write_pcfg(lex_probs, "lexicon.pcfg")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment