This gist, together with https://gist.github.com/jherskovic/8222898 will read a MEDLINE base distribution and create a journal->MeSH term count dictionary, and then pickle it. It will use all CPUs in a system to process MEDLINE files in parallel.
# Parse each article. Create dictionary journal->MeSH terms | |
# Accumulate MeSH terms by journal | |
# Put every article output in a QUEUE, PICK IT UP with a single process | |
# This way, we can process multiple PubMed files in parallel | |
from read_medline import * | |
import multiprocessing | |
import cPickle as pickle | |
import traceback | |
import sys | |
STOP_TOKEN=u"(&^%$#$%^Y&*" | |
q=multiprocessing.JoinableQueue() | |
def process_results(result_queue): | |
big_dict={} | |
count=0 | |
while True: | |
# We expect "item" to contain a (journal, MeSH term list) tuple | |
item=result_queue.get() | |
if item == STOP_TOKEN: | |
result_queue.task_done() | |
break | |
journal, terms=item | |
if journal not in big_dict: | |
big_dict[journal]={} | |
for term in terms: | |
big_dict[journal][term] = big_dict[journal].get(term, 0) + 1 | |
result_queue.task_done() | |
count+=1 | |
if count % 1000==0: | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
if count % 100000 == 0: | |
print count, | |
print "Dumping to a pickle file" | |
o_file=open("journal_major_mesh_terms.pickle", 'wb') | |
pickle.dump(big_dict, o_file, pickle.HIGHEST_PROTOCOL) | |
o_file.flush() | |
o_file.close() | |
return | |
def callback(_, MedlineCitation): | |
"""Extract journal and MeSH terms. package them in a tuple, and return them. | |
We will only process major topics.""" | |
#pprint(MedlineCitation) | |
try: | |
journal=MedlineCitation['Article']['Journal']['Title'] | |
except KeyError: | |
# Skip this article, keep parsing | |
return True | |
#pprint(journal) | |
try: | |
terms=[] | |
if u'MeshHeadingList' not in MedlineCitation: | |
return True | |
if u'MeshHeading' not in MedlineCitation['MeshHeadingList']: | |
return True | |
for contents in MedlineCitation['MeshHeadingList']['MeshHeading']: | |
#pprint(contents) | |
if u'DescriptorName' not in contents: | |
continue | |
if not isinstance(contents, dict): | |
continue | |
# Comment out the following two lines if you want ALL headings instead of just Major Topic Headings | |
if contents['DescriptorName']['@MajorTopicYN']==u'N': | |
continue | |
terms.append(contents['DescriptorName']['#text']) | |
global q | |
#pprint((journal, terms)) | |
q.put((journal, terms)) | |
return True | |
except: | |
print traceback.format_exc() | |
print "While reading", contents | |
return False | |
if __name__=="__main__": | |
print PUBMED_PATH | |
#my_callback=functools.partial(callback, out_queue=q) | |
#parse_all_medline_serial(my_callback) | |
p=multiprocessing.Pool() | |
result_processor=multiprocessing.Process(target=process_results, args=(q,)) | |
result_processor.start() | |
for m in medline_files(PUBMED_PATH): | |
print "Dispatching", m | |
p.apply_async(parse_articles, args=(m, callback)) | |
#parse_articles(m, callback) | |
p.close() | |
p.join() | |
q.put(STOP_TOKEN) | |
result_processor.join() | |
q.join() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment