Skip to content

Instantly share code, notes, and snippets.

@irokez
Created July 16, 2012 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save irokez/cc9db95069b267a1d79f to your computer and use it in GitHub Desktop.
Save irokez/cc9db95069b267a1d79f to your computer and use it in GitHub Desktop.
Parsing Russian
#!/usr/bin/env python3
import reader
import corpus
import lib
import treetaggerwrapper
import re
import sys
import os
import subprocess
def get_gender(feat, code):
if code == 'm':
feat.add('m')
elif code == 'f':
feat.add('f')
elif code == 'n':
feat.add('n')
elif code == 'c':
feat.add('m')
def get_number(feat, code):
if code == 's':
feat.add('sg')
elif code == 'p':
feat.add('pl')
def get_case(feat, code, code2 = ''):
if code == 'n':
feat.add('nom')
elif code == 'g':
feat.add('gen')
elif code == 'd':
feat.add('dat')
elif code == 'a':
feat.add('acc')
elif code == 'v':
feat.add('nom')
elif code == 'i':
feat.add('ins')
elif code == 'l':
feat.add('prep')
elif code2 == 'p':
feat.add('gen2')
elif code2 == 'l':
feat.add('loc')
def get_degree(feat, code):
if code == 'c':
feat.add('comp')
elif code == 's':
feat.add('supl')
def get_person(feat, code):
if code == '1':
feat.add('1p')
elif code == '2':
feat.add('2p')
elif code == '3':
feat.add('3p')
selected_feat = {'m', 'f', 'n', 'sg', 'pl', '1p', '2p', '3p', 'nom', 'gen', 'gen2', 'dat', 'acc', 'ins', 'prep', 'loc', 'real', 'imp', 'pass', 'comp', 'shrt'}
def translate_tag(tag):
pos = ''
feat = set()
tag = tag.ljust(11, '-')
if tag[0] == 'N':
# noun
pos = 'N'
get_gender(feat, tag[2])
get_number(feat, tag[3])
get_case(feat, tag[4], tag[6])
elif tag[0] == 'V':
# verb
pos = 'V'
get_person(feat, tag[4])
get_number(feat, tag[5])
get_gender(feat, tag[6])
get_case(feat, tag[10])
# mood
if tag[2] == 'i' and tag[7] != 'p':
feat.add('real')
elif tag[2] == 'm':
feat.add('imp')
elif tag[2] == 'n':
pos = 'VINF'
elif tag[2] == 'p':
pos = 'VADJ'
elif tag[2] == 'g':
pos = 'VADV'
# tense
if pos == 'V':
if tag[3] == 'p':
feat.add('prs')
elif tag[3] == 'f':
feat.add('npst')
elif tag[3] == 's':
feat.add('pst')
# passive
if tag[7] == 'p':
feat.add('pass')
# type
if tag[9] == 'p':
feat.add('imperf')
elif tag[9] == 'e':
feat.add('perf')
elif tag[0] == 'A':
# adjective
pos = 'A'
get_degree(feat, tag[2])
get_gender(feat, tag[3])
get_number(feat, tag[4])
get_case(feat, tag[5])
# short
if tag[6] == 's':
feat.add('shrt')
elif tag[0] == 'P':
# pronoun
pos = 'S'
get_person(feat, tag[2])
get_gender(feat, tag[3])
get_number(feat, tag[4])
get_case(feat, tag[5])
elif tag[0] == 'R':
# adverb
pos = 'ADV'
get_degree(feat, tag[1])
elif tag[0] == 'C':
# conjunction
pos = 'CONJ'
elif tag[0] == 'Q':
# particle
pos = 'PART'
elif tag[0] == 'S':
# preposition
pos = 'PR'
elif tag[0] == 'M':
# number
pos = 'NUM'
get_gender(feat, tag[2])
get_number(feat, tag[3])
get_case(feat, tag[4])
else:
pos = 'UNK'
return '.'.join([pos] + sorted(feat & selected_feat))
def parse_corpora(corpora, Reader, tune = lambda x: x):
tagger = treetaggerwrapper.TreeTagger(TAGLANG='ru',TAGDIR='/home/alexpak/tools/tree-tagger/')
readers = {}
f = open('tmp/tagged.tab', 'w')
limit = None
# tag and lemmatize, write output to a file
for corp in corpora:
readers[corp.type] = Reader(corp.source)
tune(readers[corp.type])
readers[corp.type].read(corp.filename)
newline_re = re.compile('[\n\r]+')
for row in readers[corp.type]._data[0:limit]:
lines = newline_re.split(row['text'])
sentences = []
for line in lines:
line = line.strip()
if not len(line):
continue
tags = tagger.TagText(line)
sentence = []
for tag in tags:
try:
word, info, lemma = tag.split('\t')
except:
continue
if lemma == '<unknown>':
lemma = word
if info == 'SENT':
if len(sentence) > 1:
sentences.append(sentence)
sentence = []
else:
tag = translate_tag(info)
if tag != 'UNK':
sentence.append((word, tag, lemma))
if len(sentence):
sentences.append(sentence)
row['n'] = len(sentences)
for sentence in sentences:
for word, tag, lemma in sentence:
print('\t'.join([word, tag, '0', 'x', lemma]), file=f)
print(file=f)
print('Wrote ' + corp.type, file=sys.stderr)
f.close()
# parse
args = ['/home/alexpak/tools/malt-1.6.1/parse.sh',
'russian-all',
'/home/alexpak/projects/romip/tmp/tagged.tab',
'/home/alexpak/projects/romip/tmp/parsed.tab']
pipe = subprocess.Popen(args)
pipe.communicate()
print('Parsed', file=sys.stderr)
# read and create single XML file
file_tagged = open('tmp/tagged.tab', 'r')
file_parsed = open('tmp/parsed.tab', 'r')
for corp in corpora:
f = open('res/{0}.xml'.format(corp.type), 'w')
print('<?xml version="1.0"?>', file=f)
print('<data>', file=f)
for row in readers[corp.type]._data[0:limit]:
sentences = []
sentence = []
while len(sentences) < row['n']:
line_tagged = file_tagged.readline()
line_parsed = file_parsed.readline()
if not(len(line_tagged)):
sentences.append(sentence)
line_tagged = line_tagged.strip()
line_parsed = line_parsed.strip()
if not(len(line_tagged)):
assert(len(sentence) > 0)
sentences.append('\n'.join(sentence))
sentence = []
else:
tagged = line_tagged.split('\t')
parsed = line_parsed.split('\t')
try:
sentence.append('\t'.join([tagged[0], tagged[1], parsed[2], parsed[3], tagged[4]]))
except:
print(tagged)
print(parsed)
exit()
print('<row id="{0}" rating="{1}">'.format(row['id'], row['score']), file=f)
print('<raw><![CDATA[{0}]]></raw>'.format(row['text']), file=f)
print('<parsed><![CDATA[{0}]]></parsed>'.format('\n\n'.join(sentences)), file=f)
print('</row>', file=f)
print('</data>', file=f)
f.close()
if __name__ == '__main__':
from optparse import OptionParser
parser = OptionParser('%prog [options]')
parser.add_option('-t', '--test', action='store_const', const=True, dest='test', help='parse test corpus')
parser.add_option('-n', '--number', action='store', dest='number', type='int', help='number of files to divide the test data')
opts, args = parser.parse_args()
opts.number = opts.number or 1
if opts.test:
# test_corpus = [corpus.corpus_t(type='test-{0}'.format(n), source=n, filename='res/Full/review_*.xml') for n in range(1, opts.number + 1)]
test_corpus = [corpus.corpus_t(type='test-1', source=1, filename='res/Full/review_*.xml')]
parse_corpora(test_corpus, reader.TestReader, lambda x: x.set_num(opts.number))
else:
parse_corpora(corpus.corpora, reader.Reader)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment