| # -*- coding: utf-8 -*- | |
| import sys, os, platform, re, subprocess, collections | |
| def escape_text(text): | |
| # strip characters that trip up kakasi/mecab | |
| text = text.replace("\n", " ") | |
| text = text.replace(u'\uff5e', "~") | |
| text = re.sub("<br( /)?>", "---newline---", text) | |
| #text = stripHTML(text) | |
| text = text.replace("---newline---", "<br>") | |
| return text | |
| MecabPart = collections.namedtuple('MecabPart', [ | |
| 'surface', 'form', 'base', 'reading', 'base_reading' | |
| ]) | |
| class MecabController(object): | |
| def __init__(self): | |
| self.pipe = None | |
| self.encoding = None | |
| self.args = [] | |
| self.exe = "mecab" | |
| self._detect_encoding() | |
| self._detect_fields() | |
| def _mecab_pipe(self, args): | |
| if sys.platform == "win32": | |
| si = subprocess.STARTUPINFO() | |
| try: | |
| si.dwFlags |= subprocess.STARTF_USESHOWWINDOW | |
| except: | |
| si.dwFlags |= subprocess._subprocess.STARTF_USESHOWWINDOW | |
| else: | |
| si = None | |
| args = [self.exe] + list(args) | |
| try: | |
| return subprocess.Popen(args, bufsize=-1, | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| startupinfo=si) | |
| except OSError: | |
| raise RuntimeError("Please install mecab") | |
| def _detect_fields(self): | |
| """ | |
| Detect in which fields the mecab dictionary provides which information | |
| """ | |
| pipe = self._mecab_pipe(["--node-format=%H\n", "--unk-format=%H\n"]) | |
| inp = (u"守ります").encode(self.encoding) | |
| out, err = pipe.communicate(inp) | |
| lines = out.decode(self.encoding).splitlines() | |
| fields = lines[0].split(",") | |
| form_idx = None | |
| reading_idx = None | |
| base_idx = None | |
| for j, field in enumerate(fields): | |
| if base_idx is None: | |
| if u"守る" in field: | |
| base_idx = j | |
| if reading_idx is None: | |
| if kata2hira(field) == u"まもり": | |
| reading_idx = j | |
| if form_idx is None: | |
| if field == u"動詞": | |
| form_idx = j | |
| fmt = '%%m\01%%f[%d]\01%%f[%d]\01%%f[%d]\n' % ( | |
| form_idx, base_idx, reading_idx) | |
| self.args = [ | |
| '--eos-format=EOS\n', | |
| '--node-format=' + fmt, | |
| '--unk-format=%m\01\01\01\n', | |
| ] | |
| def _detect_encoding(self): | |
| p = subprocess.Popen(['mecab', '-D'], | |
| stdout=subprocess.PIPE) | |
| out, err = p.communicate() | |
| for line in out.splitlines(): | |
| if line.startswith('charset:'): | |
| self.encoding = line[8:].strip() | |
| break | |
| else: | |
| raise RuntimeError("Failed to determine mecab charset") | |
| def ensure_open(self): | |
| if not self.pipe: | |
| self.pipe = self._mecab_pipe(self.args) | |
| def parse(self, expr): | |
| self.ensure_open() | |
| expr = escape_text(expr) | |
| self.pipe.stdin.write(expr.encode(self.encoding, "ignore") + '\n') | |
| self.pipe.stdin.flush() | |
| out = [] | |
| while True: | |
| try: | |
| line = self.pipe.stdout.readline().decode(self.encoding).strip() | |
| except UnicodeDecodeError: | |
| continue | |
| if line == u"EOS": | |
| break | |
| item = line.split("\01") | |
| surface = item[0] | |
| if item[1]: | |
| form = item[1] | |
| else: | |
| form = None | |
| if item[2]: | |
| base = re.sub(ur'^.*:', ur'', item[2]) | |
| else: | |
| base = None | |
| if item[3]: | |
| reading = kata2hira(item[3]) | |
| else: | |
| reading = None | |
| if base is not None and is_kana_only(base): | |
| base_reading = kata2hira(base) | |
| elif form == u"名詞": | |
| base_reading = reading | |
| else: | |
| base_reading = None | |
| out.append(MecabPart(surface, form, base, reading, base_reading)) | |
| return out | |
| def collapse(self, items): | |
| new_item = [u"", None, None, None, None] | |
| new_items = [] | |
| def flush(): | |
| if new_item[0]: | |
| new_items.append(MecabPart(*new_item)) | |
| new_item[:] = [u"", None, None, None, None] | |
| AUX_VERBS = (u"れる", u"られる", u"いる", u"おる") | |
| NO_JOIN = (u"という", u"と", u"が", u"の", u"に", u"は", u"から") | |
| in_conjugation = False | |
| for w in items: | |
| join_it = True | |
| join_it = join_it and in_conjugation | |
| join_it = join_it and w.form and (w.form.startswith(u'助') or ( | |
| w.form == u"動詞" and w.base in AUX_VERBS)) | |
| join_it = join_it and w.base not in NO_JOIN | |
| if join_it: | |
| new_item[0] += w.surface | |
| if new_item[3] is not None: | |
| new_item[3] += w.reading | |
| else: | |
| in_conjugation = w.form in (u"動詞", u"形容詞", u"助動詞") | |
| flush() | |
| new_item[:] = list(w) | |
| flush() | |
| return new_items | |
| def kata2hira(s): | |
| """Convert katakana to hiragana""" | |
| kata_start = 0x30a1 | |
| kata_end = 0x30ff | |
| hira_start = 0x3041 | |
| return u"".join(unichr(ord(x) + hira_start - kata_start) | |
| if ord(x) >= kata_start and ord(x) <= kata_end else x | |
| for x in s) | |
| def is_kana_only(s): | |
| m = re.match(u'^[\u3041-\u309f\u30a1-\u30ff]*$', s) | |
| return bool(m) | |
| mecab = MecabController() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment