Last active
January 2, 2016 00:49
-
-
Save jherskovic/8225889 to your computer and use it in GitHub Desktop.
This gist, together with https://gist.github.com/jherskovic/8222898 will read a MEDLINE base distribution and create a journal->MeSH term count dictionary, and then pickle it. It will use all CPUs in a system to process MEDLINE files in parallel.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Parse each article. Create dictionary journal->MeSH terms | |
# Accumulate MeSH terms by journal | |
# Put every article output in a QUEUE, PICK IT UP with a single process | |
# This way, we can process multiple PubMed files in parallel | |
from read_medline import * | |
import multiprocessing | |
import cPickle as pickle | |
import traceback | |
import sys | |
STOP_TOKEN=u"(&^%$#$%^Y&*" | |
q=multiprocessing.JoinableQueue() | |
def process_results(result_queue): | |
big_dict={} | |
count=0 | |
while True: | |
# We expect "item" to contain a (journal, MeSH term list) tuple | |
item=result_queue.get() | |
if item == STOP_TOKEN: | |
result_queue.task_done() | |
break | |
journal, terms=item | |
if journal not in big_dict: | |
big_dict[journal]={} | |
for term in terms: | |
big_dict[journal][term] = big_dict[journal].get(term, 0) + 1 | |
result_queue.task_done() | |
count+=1 | |
if count % 1000==0: | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
if count % 100000 == 0: | |
print count, | |
print "Dumping to a pickle file" | |
o_file=open("journal_major_mesh_terms.pickle", 'wb') | |
pickle.dump(big_dict, o_file, pickle.HIGHEST_PROTOCOL) | |
o_file.flush() | |
o_file.close() | |
return | |
def callback(_, MedlineCitation): | |
"""Extract journal and MeSH terms. package them in a tuple, and return them. | |
We will only process major topics.""" | |
#pprint(MedlineCitation) | |
try: | |
journal=MedlineCitation['Article']['Journal']['Title'] | |
except KeyError: | |
# Skip this article, keep parsing | |
return True | |
#pprint(journal) | |
try: | |
terms=[] | |
if u'MeshHeadingList' not in MedlineCitation: | |
return True | |
if u'MeshHeading' not in MedlineCitation['MeshHeadingList']: | |
return True | |
for contents in MedlineCitation['MeshHeadingList']['MeshHeading']: | |
#pprint(contents) | |
if u'DescriptorName' not in contents: | |
continue | |
if not isinstance(contents, dict): | |
continue | |
# Comment out the following two lines if you want ALL headings instead of just Major Topic Headings | |
if contents['DescriptorName']['@MajorTopicYN']==u'N': | |
continue | |
terms.append(contents['DescriptorName']['#text']) | |
global q | |
#pprint((journal, terms)) | |
q.put((journal, terms)) | |
return True | |
except: | |
print traceback.format_exc() | |
print "While reading", contents | |
return False | |
if __name__=="__main__": | |
print PUBMED_PATH | |
#my_callback=functools.partial(callback, out_queue=q) | |
#parse_all_medline_serial(my_callback) | |
p=multiprocessing.Pool() | |
result_processor=multiprocessing.Process(target=process_results, args=(q,)) | |
result_processor.start() | |
for m in medline_files(PUBMED_PATH): | |
print "Dispatching", m | |
p.apply_async(parse_articles, args=(m, callback)) | |
#parse_articles(m, callback) | |
p.close() | |
p.join() | |
q.put(STOP_TOKEN) | |
result_processor.join() | |
q.join() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment