Parsing Russian
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import reader | |
import corpus | |
import lib | |
import treetaggerwrapper | |
import re | |
import sys | |
import os | |
import subprocess | |
def get_gender(feat, code): | |
if code == 'm': | |
feat.add('m') | |
elif code == 'f': | |
feat.add('f') | |
elif code == 'n': | |
feat.add('n') | |
elif code == 'c': | |
feat.add('m') | |
def get_number(feat, code): | |
if code == 's': | |
feat.add('sg') | |
elif code == 'p': | |
feat.add('pl') | |
def get_case(feat, code, code2 = ''): | |
if code == 'n': | |
feat.add('nom') | |
elif code == 'g': | |
feat.add('gen') | |
elif code == 'd': | |
feat.add('dat') | |
elif code == 'a': | |
feat.add('acc') | |
elif code == 'v': | |
feat.add('nom') | |
elif code == 'i': | |
feat.add('ins') | |
elif code == 'l': | |
feat.add('prep') | |
elif code2 == 'p': | |
feat.add('gen2') | |
elif code2 == 'l': | |
feat.add('loc') | |
def get_degree(feat, code): | |
if code == 'c': | |
feat.add('comp') | |
elif code == 's': | |
feat.add('supl') | |
def get_person(feat, code): | |
if code == '1': | |
feat.add('1p') | |
elif code == '2': | |
feat.add('2p') | |
elif code == '3': | |
feat.add('3p') | |
selected_feat = {'m', 'f', 'n', 'sg', 'pl', '1p', '2p', '3p', 'nom', 'gen', 'gen2', 'dat', 'acc', 'ins', 'prep', 'loc', 'real', 'imp', 'pass', 'comp', 'shrt'} | |
def translate_tag(tag): | |
pos = '' | |
feat = set() | |
tag = tag.ljust(11, '-') | |
if tag[0] == 'N': | |
# noun | |
pos = 'N' | |
get_gender(feat, tag[2]) | |
get_number(feat, tag[3]) | |
get_case(feat, tag[4], tag[6]) | |
elif tag[0] == 'V': | |
# verb | |
pos = 'V' | |
get_person(feat, tag[4]) | |
get_number(feat, tag[5]) | |
get_gender(feat, tag[6]) | |
get_case(feat, tag[10]) | |
# mood | |
if tag[2] == 'i' and tag[7] != 'p': | |
feat.add('real') | |
elif tag[2] == 'm': | |
feat.add('imp') | |
elif tag[2] == 'n': | |
pos = 'VINF' | |
elif tag[2] == 'p': | |
pos = 'VADJ' | |
elif tag[2] == 'g': | |
pos = 'VADV' | |
# tense | |
if pos == 'V': | |
if tag[3] == 'p': | |
feat.add('prs') | |
elif tag[3] == 'f': | |
feat.add('npst') | |
elif tag[3] == 's': | |
feat.add('pst') | |
# passive | |
if tag[7] == 'p': | |
feat.add('pass') | |
# type | |
if tag[9] == 'p': | |
feat.add('imperf') | |
elif tag[9] == 'e': | |
feat.add('perf') | |
elif tag[0] == 'A': | |
# adjective | |
pos = 'A' | |
get_degree(feat, tag[2]) | |
get_gender(feat, tag[3]) | |
get_number(feat, tag[4]) | |
get_case(feat, tag[5]) | |
# short | |
if tag[6] == 's': | |
feat.add('shrt') | |
elif tag[0] == 'P': | |
# pronoun | |
pos = 'S' | |
get_person(feat, tag[2]) | |
get_gender(feat, tag[3]) | |
get_number(feat, tag[4]) | |
get_case(feat, tag[5]) | |
elif tag[0] == 'R': | |
# adverb | |
pos = 'ADV' | |
get_degree(feat, tag[1]) | |
elif tag[0] == 'C': | |
# conjunction | |
pos = 'CONJ' | |
elif tag[0] == 'Q': | |
# particle | |
pos = 'PART' | |
elif tag[0] == 'S': | |
# preposition | |
pos = 'PR' | |
elif tag[0] == 'M': | |
# number | |
pos = 'NUM' | |
get_gender(feat, tag[2]) | |
get_number(feat, tag[3]) | |
get_case(feat, tag[4]) | |
else: | |
pos = 'UNK' | |
return '.'.join([pos] + sorted(feat & selected_feat)) | |
def parse_corpora(corpora, Reader, tune = lambda x: x): | |
tagger = treetaggerwrapper.TreeTagger(TAGLANG='ru',TAGDIR='/home/alexpak/tools/tree-tagger/') | |
readers = {} | |
f = open('tmp/tagged.tab', 'w') | |
limit = None | |
# tag and lemmatize, write output to a file | |
for corp in corpora: | |
readers[corp.type] = Reader(corp.source) | |
tune(readers[corp.type]) | |
readers[corp.type].read(corp.filename) | |
newline_re = re.compile('[\n\r]+') | |
for row in readers[corp.type]._data[0:limit]: | |
lines = newline_re.split(row['text']) | |
sentences = [] | |
for line in lines: | |
line = line.strip() | |
if not len(line): | |
continue | |
tags = tagger.TagText(line) | |
sentence = [] | |
for tag in tags: | |
try: | |
word, info, lemma = tag.split('\t') | |
except: | |
continue | |
if lemma == '<unknown>': | |
lemma = word | |
if info == 'SENT': | |
if len(sentence) > 1: | |
sentences.append(sentence) | |
sentence = [] | |
else: | |
tag = translate_tag(info) | |
if tag != 'UNK': | |
sentence.append((word, tag, lemma)) | |
if len(sentence): | |
sentences.append(sentence) | |
row['n'] = len(sentences) | |
for sentence in sentences: | |
for word, tag, lemma in sentence: | |
print('\t'.join([word, tag, '0', 'x', lemma]), file=f) | |
print(file=f) | |
print('Wrote ' + corp.type, file=sys.stderr) | |
f.close() | |
# parse | |
args = ['/home/alexpak/tools/malt-1.6.1/parse.sh', | |
'russian-all', | |
'/home/alexpak/projects/romip/tmp/tagged.tab', | |
'/home/alexpak/projects/romip/tmp/parsed.tab'] | |
pipe = subprocess.Popen(args) | |
pipe.communicate() | |
print('Parsed', file=sys.stderr) | |
# read and create single XML file | |
file_tagged = open('tmp/tagged.tab', 'r') | |
file_parsed = open('tmp/parsed.tab', 'r') | |
for corp in corpora: | |
f = open('res/{0}.xml'.format(corp.type), 'w') | |
print('<?xml version="1.0"?>', file=f) | |
print('<data>', file=f) | |
for row in readers[corp.type]._data[0:limit]: | |
sentences = [] | |
sentence = [] | |
while len(sentences) < row['n']: | |
line_tagged = file_tagged.readline() | |
line_parsed = file_parsed.readline() | |
if not(len(line_tagged)): | |
sentences.append(sentence) | |
line_tagged = line_tagged.strip() | |
line_parsed = line_parsed.strip() | |
if not(len(line_tagged)): | |
assert(len(sentence) > 0) | |
sentences.append('\n'.join(sentence)) | |
sentence = [] | |
else: | |
tagged = line_tagged.split('\t') | |
parsed = line_parsed.split('\t') | |
try: | |
sentence.append('\t'.join([tagged[0], tagged[1], parsed[2], parsed[3], tagged[4]])) | |
except: | |
print(tagged) | |
print(parsed) | |
exit() | |
print('<row id="{0}" rating="{1}">'.format(row['id'], row['score']), file=f) | |
print('<raw><![CDATA[{0}]]></raw>'.format(row['text']), file=f) | |
print('<parsed><![CDATA[{0}]]></parsed>'.format('\n\n'.join(sentences)), file=f) | |
print('</row>', file=f) | |
print('</data>', file=f) | |
f.close() | |
if __name__ == '__main__': | |
from optparse import OptionParser | |
parser = OptionParser('%prog [options]') | |
parser.add_option('-t', '--test', action='store_const', const=True, dest='test', help='parse test corpus') | |
parser.add_option('-n', '--number', action='store', dest='number', type='int', help='number of files to divide the test data') | |
opts, args = parser.parse_args() | |
opts.number = opts.number or 1 | |
if opts.test: | |
# test_corpus = [corpus.corpus_t(type='test-{0}'.format(n), source=n, filename='res/Full/review_*.xml') for n in range(1, opts.number + 1)] | |
test_corpus = [corpus.corpus_t(type='test-1', source=1, filename='res/Full/review_*.xml')] | |
parse_corpora(test_corpus, reader.TestReader, lambda x: x.set_num(opts.number)) | |
else: | |
parse_corpora(corpus.corpora, reader.Reader) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment