Skip to content

Instantly share code, notes, and snippets.

@marevol
Last active August 29, 2015 14:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marevol/02c5a2f8675e4e061a7e to your computer and use it in GitHub Desktop.
Save marevol/02c5a2f8675e4e061a7e to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
# http://www.chazine.com/archives/3630
import json
import logging
import multiprocessing
import os.path
import re
import sys
from elasticsearch.client import Elasticsearch
from elasticsearch.exceptions import NotFoundError
from gensim import utils
from gensim.corpora import Dictionary, HashDictionary, MmCorpus, WikiCorpus
from gensim.corpora.textcorpus import TextCorpus
from gensim.corpora.wikicorpus import filter_wiki, extract_pages
from gensim.models import TfidfModel
# ignore articles shorter than ARTICLE_MIN_WORDS characters (after full preprocessing)
ARTICLE_MIN_WORDS = 50
# Wiki is first scanned for all distinct word types (~7M). The types that
# appear in more than 10% of articles are removed and from the rest, the
# DEFAULT_DICT_SIZE most frequent types are kept.
DEFAULT_DICT_SIZE = 100000
es = None
def init_elasticsearch(hosts):
global es
es = Elasticsearch(hosts=hosts)
def jatokenize(text):
global es
index = "jawiki-pages-articles"
analyzer = "ja_analyzer"
tokens = []
# remove chars
text = re.sub(r'[|=!*_]+', ' ', text)
try:
data = es.indices.client.transport.perform_request('GET', '/' + index + '/_extended_analyze',
params={"analyzer":analyzer, "format":"json"}, body=text)
ja_tokenizer = data[1].get("tokenizer").get("ja_tokenizer")
for token in ja_tokenizer:
pos = token.get("extended_attributes")\
.get("org.apache.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute")\
.get("partOfSpeech")
if pos.startswith("名詞") or pos.startswith("動詞") or pos.startswith("形容詞"):
tokens.append(token.get("token"))
except:
print u"Unexpected error: {0}".format(sys.exc_info()[0])
print u"text => {0}".format(text)
return tokens
def tokenize(content):
content = content.lower()
return jatokenize(content)
class EsWikiCorpus(TextCorpus):
def __init__(self, hosts=["http://localhost:9200"], index="jawiki-pages-articles", source="{\"query\":{\"match_all\":{}}}", processes=None):
self.index = index
self.source = json.loads(source)
es = Elasticsearch(hosts=hosts)
if processes is None:
processes = max(1, multiprocessing.cpu_count() - 1)
self.processes = processes
self.hosts = hosts
super(EsWikiCorpus, self).__init__(input=es)
def get_texts(self):
es = self.input
pool = multiprocessing.Pool(self.processes, initializer=init_elasticsearch, initargs=(self.hosts,))
response = es.search(index=self.index,
scroll='5m',
search_type='scan',
size=100,
body=self.source)
scroll_id = response['_scroll_id']
counter = 0
while(True):
texts = []
try:
response = es.scroll(scroll_id=scroll_id, scroll='5m')
if len(response['hits']['hits']) == 0:
break
for hit in response['hits']['hits']:
counter = counter + 1
if "_source" in hit and "title" in hit['_source'] and "text" in hit['_source']:
title = hit['_source']['title']
if title != None and len(title) != 0:
texts.append(title)
text = hit['_source']['text']
if text != None and len(text) != 0 and not text.startswith("#redirect"):
texts.append(text)
except NotFoundError:
logger.info(u"Finished ({0}) documents.".format(counter))
break
except:
logger.warning(u"Unexpected error: {0}".format(sys.exc_info()[0]))
break
if len(texts) > 0:
for tokens in pool.map(tokenize, texts):
yield tokens
pool.terminate()
# endclass EsWikiCorpus
if __name__ == '__main__':
program = os.path.basename(sys.argv[0])
logger = logging.getLogger(program)
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s')
logging.root.setLevel(level=logging.INFO)
logger.info("running %s" % ' '.join(sys.argv))
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
# check and process input arguments
if len(sys.argv) < 3:
print(globals()['__doc__'] % locals())
sys.exit(1)
inp, outp = sys.argv[1:3]
if len(sys.argv) > 3:
keep_words = int(sys.argv[3])
else:
keep_words = DEFAULT_DICT_SIZE
debug = 'nodebug' not in program
source = "{\"query\":{\"bool\":{\"must\":[{\"term\":{\"redirect\":{\"value\":false}}},{\"term\":{\"special\":{\"value\":false}}}]}}}"
values = inp.split("/")
wiki_corpus = EsWikiCorpus(hosts=values[0], index=values[1], source=source)
wiki_corpus.dictionary.filter_extremes(no_below=20, no_above=0.1, keep_n=DEFAULT_DICT_SIZE)
MmCorpus.serialize(outp + '_bow.mm', wiki_corpus, progress_cnt=10000)
wiki_corpus.dictionary.save_as_text(outp + '_wordids.txt.bz2')
dictionary = Dictionary.load_from_text(outp + '_wordids.txt.bz2')
del wiki_corpus
# initialize corpus reader and word->id mapping
mm = MmCorpus(outp + '_bow.mm')
# build tfidf, ~50min
tfidf = TfidfModel(mm, id2word=dictionary, normalize=True)
# save tfidf vectors in matrix market format
# ~4h; result file is 15GB! bzip2'ed down to 4.5GB
MmCorpus.serialize(outp + '_tfidf.mm', tfidf[mm], progress_cnt=10000)
logger.info("finished running %s" % program)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment