Skip to content

Instantly share code, notes, and snippets.

@jherskovic
Last active January 2, 2016 00:49
Show Gist options
  • Save jherskovic/8225889 to your computer and use it in GitHub Desktop.
Save jherskovic/8225889 to your computer and use it in GitHub Desktop.
This gist, together with https://gist.github.com/jherskovic/8222898 will read a MEDLINE base distribution and create a journal->MeSH term count dictionary, and then pickle it. It will use all CPUs in a system to process MEDLINE files in parallel.
# Parse each article. Create dictionary journal->MeSH terms
# Accumulate MeSH terms by journal
# Put every article output in a QUEUE, PICK IT UP with a single process
# This way, we can process multiple PubMed files in parallel
from read_medline import *
import multiprocessing
import cPickle as pickle
import traceback
import sys
STOP_TOKEN=u"(&^%$#$%^Y&*"
q=multiprocessing.JoinableQueue()
def process_results(result_queue):
big_dict={}
count=0
while True:
# We expect "item" to contain a (journal, MeSH term list) tuple
item=result_queue.get()
if item == STOP_TOKEN:
result_queue.task_done()
break
journal, terms=item
if journal not in big_dict:
big_dict[journal]={}
for term in terms:
big_dict[journal][term] = big_dict[journal].get(term, 0) + 1
result_queue.task_done()
count+=1
if count % 1000==0:
sys.stdout.write(".")
sys.stdout.flush()
if count % 100000 == 0:
print count,
print
print "Dumping to a pickle file"
o_file=open("journal_major_mesh_terms.pickle", 'wb')
pickle.dump(big_dict, o_file, pickle.HIGHEST_PROTOCOL)
o_file.flush()
o_file.close()
return
def callback(_, MedlineCitation):
"""Extract journal and MeSH terms. package them in a tuple, and return them.
We will only process major topics."""
#pprint(MedlineCitation)
try:
journal=MedlineCitation['Article']['Journal']['Title']
except KeyError:
# Skip this article, keep parsing
return True
#pprint(journal)
try:
terms=[]
if u'MeshHeadingList' not in MedlineCitation:
return True
if u'MeshHeading' not in MedlineCitation['MeshHeadingList']:
return True
for contents in MedlineCitation['MeshHeadingList']['MeshHeading']:
#pprint(contents)
if u'DescriptorName' not in contents:
continue
if not isinstance(contents, dict):
continue
# Comment out the following two lines if you want ALL headings instead of just Major Topic Headings
if contents['DescriptorName']['@MajorTopicYN']==u'N':
continue
terms.append(contents['DescriptorName']['#text'])
global q
#pprint((journal, terms))
q.put((journal, terms))
return True
except:
print traceback.format_exc()
print "While reading", contents
return False
if __name__=="__main__":
print PUBMED_PATH
#my_callback=functools.partial(callback, out_queue=q)
#parse_all_medline_serial(my_callback)
p=multiprocessing.Pool()
result_processor=multiprocessing.Process(target=process_results, args=(q,))
result_processor.start()
for m in medline_files(PUBMED_PATH):
print "Dispatching", m
p.apply_async(parse_articles, args=(m, callback))
#parse_articles(m, callback)
p.close()
p.join()
q.put(STOP_TOKEN)
result_processor.join()
q.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment