Skip to content

Instantly share code, notes, and snippets.

@ryanwitt
Created December 5, 2012 20:44
Show Gist options
  • Save ryanwitt/4219327 to your computer and use it in GitHub Desktop.
Save ryanwitt/4219327 to your computer and use it in GitHub Desktop.
Indexing code for the NPI database and medicare doctor referral graph.

Doctor referral graph / NPI database full-text indexer

You need 7zip installed to grab the NPI database. (brew install p7zip osx)

To create the index, run the init_* scripts. You would need the doctor graph referral data to use *_refer.*, but the NPI database will be automatically downloaded for you. Indexing happens on all cores, and takes less than 10 min on my 8 core machine.

To grab lines matching a search term, use python search_npi.py term.

Note: index performance is good if you have a lot of memory. Index file blocks will stay hot in cache, but they are loaded each time the program is run, which is super inefficient. Should use an on-disk hashtable where the offsets can be calculated instead.

#!/usr/bin/env python
import os, gc, glob, shelve, multiprocessing
def make_index(filename):
print 'indexing', filename
statinfo = os.stat(filename)
index = {}
offset, count = 0, 0
with file(filename) as f:
while True:
line, new_offset = f.readline(), f.tell()
if not line:
break
parts = [x.strip('"') for x in line.split(',')]
for part in parts:
for word in part.split():
if word:
word = word.strip().lower()
if word in index:
index[word].add(offset)
else:
index[word] = {offset}
offset = new_offset
# Copy index to shelf
index_file = 'index/'+os.path.basename(filename)+'.shelf'
shelf = shelve.open(index_file, protocol=2)
for k,v in index.items():
shelf[k] = v
shelf.close()
print 'done', filename, '-->', index_file
del index
gc.collect()
return index_file
if __name__ == '__main__':
try:
os.mkdir('index')
except OSError, e:
pass
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
indicies = pool.map(make_index, glob.glob('npi/*'))
print indicies
#!/usr/bin/env python
import os, gc, glob, shelve, multiprocessing
def make_index(filename):
print 'indexing', filename
statinfo = os.stat(filename)
index = {}
offset, count = 0, 0
with file(filename) as f:
while True:
line, new_offset = f.readline(), f.tell()
if not line:
break
for part in line.split(','):
for word in part.split():
if word:
word = word.strip().lower()
if word in index:
index[word].add(offset)
else:
index[word] = {offset}
offset = new_offset
# Copy index to shelf
index_file = 'index_refer/'+os.path.basename(filename)+'.shelf'
shelf = shelve.open(index_file, protocol=2)
for k,v in index.items():
shelf[k] = v
shelf.close()
print 'done', filename, '-->', index_file
del index
gc.collect()
return index_file
if __name__ == '__main__':
try:
os.mkdir('index_refer')
except OSError, e:
pass
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
indicies = pool.map(make_index, glob.glob('refer/*'))
print indicies
curl -O http://nppes.viva-it.com/NPPES_Data_Dissemination_Oct_2012.zip
7z e NPPES_Data_Dissemination_Oct_2012.zip
echo 'splitting file, this may take a minute...'
mkdir npi && pushd npi && split -l 50000 ../npidata_20050523-20121008.csv && popd
./index_npi.py
./search_npi.py labcorp
echo 'splitting file, this may take a minute...'
mkdir refer && pushd refer && split -l 100000 ../refer.2011.csv && popd
./index_refer.py
#!/usr/bin/env python
import os, sys, glob, shelve, progressbar
terms = sys.argv[1:]
results = []
sys.stderr.write('Searching index...\n')
progress = progressbar.ProgressBar()
for name in progress(glob.glob('index/*.shelf')):
#sys.stderr.write('searching ' + name + ' for ' + ' or '.join(terms) + '\n')
shelf = shelve.open(name)
for term in terms:
q = term.lower()
results.append((name, shelf.get(q, {})))
shelf.close()
#result_list = []
count = 0
sys.stderr.write('Gathering lines...\n')
progress = progressbar.ProgressBar()
for name, offset_set in progress(results):
filename = os.path.basename(name).split('.')[0]
with file('npi/'+filename) as f:
for offset in offset_set:
f.seek(offset)
sys.stdout.write(f.readline())
count += 1
#result_list.append((offset, f.readline()))
#print ''.join(line for offset, line in result_list)
#sys.stderr.write(str(len(result_list)) + ' results\n')
sys.stderr.write(str(count) + ' results\n')
#!/usr/bin/env python
import os, sys, glob, shelve, progressbar
terms = sys.argv[1:]
results = []
sys.stderr.write('Searching index...\n')
progress = progressbar.ProgressBar()
for name in progress(glob.glob('index_refer/*.shelf')):
#sys.stderr.write('searching ' + name + ' for ' + ' or '.join(terms) + '\n')
shelf = shelve.open(name)
for term in terms:
q = term.lower()
results.append((name, shelf.get(q, {})))
shelf.close()
count = 0
sys.stderr.write('Gathering lines...\n')
progress = progressbar.ProgressBar()
for name, offset_set in progress(results):
filename = os.path.basename(name).split('.')[0]
with file('refer/'+filename) as f:
for offset in offset_set:
f.seek(offset)
sys.stdout.write(f.readline())
count += 1
#result_list.append((offset, f.readline()))
#print ''.join(line for offset, line in result_list)
#sys.stderr.write(str(len(result_list)) + ' results\n')
sys.stderr.write(str(count) + ' results\n')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment