Skip to content

Instantly share code, notes, and snippets.

@marevol
Last active August 29, 2015 14:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marevol/02c5a2f8675e4e061a7e to your computer and use it in GitHub Desktop.
Save marevol/02c5a2f8675e4e061a7e to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
# http://www.chazine.com/archives/3624
import bz2
import logging
import multiprocessing
import os.path
import sys
from elasticsearch.client import Elasticsearch
from gensim import utils
from gensim.corpora import Dictionary, HashDictionary, MmCorpus, WikiCorpus
from gensim.corpora.wikicorpus import filter_wiki, extract_pages
from gensim.models import TfidfModel
# ignore articles shorter than ARTICLE_MIN_WORDS characters (after full preprocessing)
ARTICLE_MIN_WORDS = 50
# Wiki is first scanned for all distinct word types (~7M). The types that
# appear in more than 10% of articles are removed and from the rest, the
# DEFAULT_DICT_SIZE most frequent types are kept.
DEFAULT_DICT_SIZE = 100000
# Elasticsearch Info
es = Elasticsearch(hosts=["http://localhost:9200"])
index = "jawiki-pages-articles"
analyzer = "ja_analyzer"
tokenizer_name = "ja_tokenizer"
def jatokenize(text):
tokens = []
if len(text) == 0:
return tokens
try:
# data = es.indices.analyze(index=index,body=text, params={"analyzer":analyzer})
data = es.indices.client.transport.perform_request('GET', '/' + index + '/_extended_analyze',
params={"analyzer":analyzer,"format":"json"}, body=text)
# for token in data.get("tokens"):
for token in data[1].get("tokenizer").get(tokenizer_name):
if token.get("extended_attributes")\
.get("org.apache.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute")\
.get("partOfSpeech").startswith("名詞"):
tokens.append(token.get("token"))
except:
print u"Unexpected error: {0}".format(sys.exc_info()[0])
print u"text => {0}".format(text)
return tokens
def tokenize(content):
return [token for token in jatokenize(content) if not token.startswith('_')]
def process_article(args):
text, lemmatize, title, pageid = args
text = filter_wiki(text)
if lemmatize:
result = utils.lemmatize(text)
else:
result = tokenize(text)
return result, title, pageid
class JaWikiCorpus(WikiCorpus):
def get_texts(self):
articles, articles_all = 0, 0
positions, positions_all = 0, 0
texts = ((text, self.lemmatize, title, pageid) for title, text, pageid in extract_pages(bz2.BZ2File(self.fname), self.filter_namespaces))
pool = multiprocessing.Pool(self.processes)
# process the corpus in smaller chunks of docs, because multiprocessing.Pool
# is dumb and would load the entire input into RAM at once...
ignore_namespaces = 'Wikipedia Category File Portal Template MediaWiki User Help Book Draft'.split()
for group in utils.chunkize(texts, chunksize=10 * self.processes, maxsize=1):
for tokens, title, pageid in pool.imap(process_article, group): # chunksize=10):
articles_all += 1
positions_all += len(tokens)
# article redirects and short stubs are pruned here
if len(tokens) < ARTICLE_MIN_WORDS or any(title.startswith(ignore + ':') for ignore in ignore_namespaces):
continue
articles += 1
positions += len(tokens)
if self.metadata:
yield (tokens, (pageid, title))
else:
yield tokens
pool.terminate()
logger.info("finished iterating over Wikipedia corpus of %i documents with %i positions"
" (total %i articles, %i positions before pruning articles shorter than %i words)" %
(articles, positions, articles_all, positions_all, ARTICLE_MIN_WORDS))
self.length = articles # cache corpus length
# endclass WikiCorpus
if __name__ == '__main__':
program = os.path.basename(sys.argv[0])
logger = logging.getLogger(program)
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s')
logging.root.setLevel(level=logging.INFO)
logger.info("running %s" % ' '.join(sys.argv))
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
# check and process input arguments
if len(sys.argv) < 3:
print(globals()['__doc__'] % locals())
sys.exit(1)
inp, outp = sys.argv[1:3]
if len(sys.argv) > 3:
keep_words = int(sys.argv[3])
else:
keep_words = DEFAULT_DICT_SIZE
online = 'online' in program
lemmatize = 'lemma' in program
debug = 'nodebug' not in program
if online:
dictionary = HashDictionary(id_range=keep_words, debug=debug)
dictionary.allow_update = True # start collecting document frequencies
wiki = JaWikiCorpus(inp, lemmatize=lemmatize, dictionary=dictionary)
MmCorpus.serialize(outp + '_bow.mm', wiki, progress_cnt=10000) # ~4h on my macbook pro without lemmatization, 3.1m articles (august 2012)
# with HashDictionary, the token->id mapping is only fully instantiated now, after `serialize`
dictionary.filter_extremes(no_below=20, no_above=0.1, keep_n=DEFAULT_DICT_SIZE)
dictionary.save_as_text(outp + '_wordids.txt.bz2')
wiki.save(outp + '_corpus.pkl.bz2')
dictionary.allow_update = False
else:
wiki = JaWikiCorpus(inp, lemmatize=lemmatize) # takes about 9h on a macbook pro, for 3.5m articles (june 2011)
# only keep the most frequent words (out of total ~8.2m unique tokens)
wiki.dictionary.filter_extremes(no_below=20, no_above=0.1, keep_n=DEFAULT_DICT_SIZE)
# save dictionary and bag-of-words (term-document frequency matrix)
MmCorpus.serialize(outp + '_bow.mm', wiki, progress_cnt=10000) # another ~9h
wiki.dictionary.save_as_text(outp + '_wordids.txt.bz2')
# load back the id->word mapping directly from file
# this seems to save more memory, compared to keeping the wiki.dictionary object from above
dictionary = Dictionary.load_from_text(outp + '_wordids.txt.bz2')
del wiki
# initialize corpus reader and word->id mapping
mm = MmCorpus(outp + '_bow.mm')
# build tfidf, ~50min
tfidf = TfidfModel(mm, id2word=dictionary, normalize=True)
# save tfidf vectors in matrix market format
# ~4h; result file is 15GB! bzip2'ed down to 4.5GB
MmCorpus.serialize(outp + '_tfidf.mm', tfidf[mm], progress_cnt=10000)
logger.info("finished running %s" % program)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment