Skip to content

Instantly share code, notes, and snippets.

@marevol marevol/jawikicorpus.py
Last active Aug 29, 2015

Embed
What would you like to do?
# -*- coding: utf-8 -*-
# http://www.chazine.com/archives/3630
import json
import logging
import multiprocessing
import os.path
import re
import sys
from elasticsearch.client import Elasticsearch
from elasticsearch.exceptions import NotFoundError
from gensim import utils
from gensim.corpora import Dictionary, HashDictionary, MmCorpus, WikiCorpus
from gensim.corpora.textcorpus import TextCorpus
from gensim.corpora.wikicorpus import filter_wiki, extract_pages
from gensim.models import TfidfModel
# ignore articles shorter than ARTICLE_MIN_WORDS characters (after full preprocessing)
ARTICLE_MIN_WORDS = 50
# Wiki is first scanned for all distinct word types (~7M). The types that
# appear in more than 10% of articles are removed and from the rest, the
# DEFAULT_DICT_SIZE most frequent types are kept.
DEFAULT_DICT_SIZE = 100000
es = None
def init_elasticsearch(hosts):
global es
es = Elasticsearch(hosts=hosts)
def jatokenize(text):
global es
index = "jawiki-pages-articles"
analyzer = "ja_analyzer"
tokens = []
# remove chars
text = re.sub(r'[|=!*_]+', ' ', text)
try:
data = es.indices.client.transport.perform_request('GET', '/' + index + '/_extended_analyze',
params={"analyzer":analyzer, "format":"json"}, body=text)
ja_tokenizer = data[1].get("tokenizer").get("ja_tokenizer")
for token in ja_tokenizer:
pos = token.get("extended_attributes")\
.get("org.apache.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute")\
.get("partOfSpeech")
if pos.startswith("名詞") or pos.startswith("動詞") or pos.startswith("形容詞"):
tokens.append(token.get("token"))
except:
print u"Unexpected error: {0}".format(sys.exc_info()[0])
print u"text => {0}".format(text)
return tokens
def tokenize(content):
content = content.lower()
return jatokenize(content)
class EsWikiCorpus(TextCorpus):
def __init__(self, hosts=["http://localhost:9200"], index="jawiki-pages-articles", source="{\"query\":{\"match_all\":{}}}", processes=None):
self.index = index
self.source = json.loads(source)
es = Elasticsearch(hosts=hosts)
if processes is None:
processes = max(1, multiprocessing.cpu_count() - 1)
self.processes = processes
self.hosts = hosts
super(EsWikiCorpus, self).__init__(input=es)
def get_texts(self):
es = self.input
pool = multiprocessing.Pool(self.processes, initializer=init_elasticsearch, initargs=(self.hosts,))
response = es.search(index=self.index,
scroll='5m',
search_type='scan',
size=100,
body=self.source)
scroll_id = response['_scroll_id']
counter = 0
while(True):
texts = []
try:
response = es.scroll(scroll_id=scroll_id, scroll='5m')
if len(response['hits']['hits']) == 0:
break
for hit in response['hits']['hits']:
counter = counter + 1
if "_source" in hit and "title" in hit['_source'] and "text" in hit['_source']:
title = hit['_source']['title']
if title != None and len(title) != 0:
texts.append(title)
text = hit['_source']['text']
if text != None and len(text) != 0 and not text.startswith("#redirect"):
texts.append(text)
except NotFoundError:
logger.info(u"Finished ({0}) documents.".format(counter))
break
except:
logger.warning(u"Unexpected error: {0}".format(sys.exc_info()[0]))
break
if len(texts) > 0:
for tokens in pool.map(tokenize, texts):
yield tokens
pool.terminate()
# endclass EsWikiCorpus
if __name__ == '__main__':
program = os.path.basename(sys.argv[0])
logger = logging.getLogger(program)
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s')
logging.root.setLevel(level=logging.INFO)
logger.info("running %s" % ' '.join(sys.argv))
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
# check and process input arguments
if len(sys.argv) < 3:
print(globals()['__doc__'] % locals())
sys.exit(1)
inp, outp = sys.argv[1:3]
if len(sys.argv) > 3:
keep_words = int(sys.argv[3])
else:
keep_words = DEFAULT_DICT_SIZE
debug = 'nodebug' not in program
source = "{\"query\":{\"bool\":{\"must\":[{\"term\":{\"redirect\":{\"value\":false}}},{\"term\":{\"special\":{\"value\":false}}}]}}}"
values = inp.split("/")
wiki_corpus = EsWikiCorpus(hosts=values[0], index=values[1], source=source)
wiki_corpus.dictionary.filter_extremes(no_below=20, no_above=0.1, keep_n=DEFAULT_DICT_SIZE)
MmCorpus.serialize(outp + '_bow.mm', wiki_corpus, progress_cnt=10000)
wiki_corpus.dictionary.save_as_text(outp + '_wordids.txt.bz2')
dictionary = Dictionary.load_from_text(outp + '_wordids.txt.bz2')
del wiki_corpus
# initialize corpus reader and word->id mapping
mm = MmCorpus(outp + '_bow.mm')
# build tfidf, ~50min
tfidf = TfidfModel(mm, id2word=dictionary, normalize=True)
# save tfidf vectors in matrix market format
# ~4h; result file is 15GB! bzip2'ed down to 4.5GB
MmCorpus.serialize(outp + '_tfidf.mm', tfidf[mm], progress_cnt=10000)
logger.info("finished running %s" % program)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.