Skip to content

Instantly share code, notes, and snippets.

@tsuchm
Created July 15, 2021 23:57
Show Gist options
  • Save tsuchm/b368b583e8430805d6deb131d7248d0b to your computer and use it in GitHub Desktop.
Save tsuchm/b368b583e8430805d6deb131d7248d0b to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
"""
Parallelize MeCab
$ wc -l corpus.txt
2761174 corpus.txt
$ time ./parallel-mecab.py corpus.txt >/dev/null
real 0m12.610s
user 1m46.120s
sys 0m4.080s
$ time mecab -Owakati corpus.txt >/dev/null
real 0m48.007s
user 0m47.546s
sys 0m0.460s
"""
import concurrent.futures
import fugashi
import io
import os
import sys
import unidic_lite
import unicodedata
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
tagger = None
def tokenize(xs):
global tagger
if tagger is None:
dicdir = unidic_lite.DICDIR
mecabrc = os.path.join(dicdir, 'mecabrc')
option = "-d {} -r {}".format(dicdir, mecabrc)
tagger = fugashi.GenericTagger(option)
charset = tagger.dictionary_info[0]['charset']
assert charset == 'utf-8' or charset == 'utf8'
ys = []
for x in xs:
if len(x) > 0:
x = unicodedata.normalize('NFKC', x)
ys.append(' '.join([w.surface for w in tagger(x)]))
else:
ys.append('')
return ys
def parse_args():
import argparse as ap
p = ap.ArgumentParser()
p.add_argument('inputs', type=ap.FileType('r', encoding='utf-8', errors='ignore'), nargs='*', default=[sys.stdin])
p.add_argument('-o', '--output', type=ap.FileType('w', encoding='utf-8'), default=sys.stdout)
p.add_argument('--min', type=int, default=10)
p.add_argument('--max', type=int, default=200)
return p.parse_args()
def main():
args = parse_args()
def divide(iterable):
fragment = []
for x in iterable:
x = x.rstrip()
if len(x) >= args.min and len(x) <= args.max:
fragment.append(x)
if len(x) == 0:
if len(fragment[-1]) > 0:
fragment.append('')
if len(fragment) >= 200000:
yield fragment
fragment = []
if len(fragment) > 0:
yield fragment
def callback(future):
for x in future.result():
print(x, file=args.output)
with concurrent.futures.ProcessPoolExecutor() as executor:
for fp in args.inputs:
for fragment in divide(fp):
executor.submit(tokenize, fragment).add_done_callback(callback)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment