Skip to content

Instantly share code, notes, and snippets.

@dgrant
Last active January 4, 2016 13:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dgrant/8629413 to your computer and use it in GitHub Desktop.
Save dgrant/8629413 to your computer and use it in GitHub Desktop.
A utility to find what labs from Excelleris are missing from OSCAR
#!/usr/bin/env python3
"""
A utility to find what labs from Excelleris are missing from OSCAR
"""
import argparse
import csv
import datetime
from operator import itemgetter
import unittest
import sys
def parse_excelleris_date(date):
"""
Parses a date from excelleris into a datetime object
"""
if date.find('AM') != -1 or date.find('PM') != -1:
return datetime.datetime.strptime(date, '%m/%d/%y %I:%M %p')
else:
return datetime.datetime.strptime(date, '%m-%d-%Y %H:%M')
def format_date(date):
"""
Return a datetime object as an ISO formatted string
"""
return date.isoformat()
def get_excelleris_ans(filename):
"""
The provided filename is a CSV file
provided by Excelleris with the following columns:
PHN,Accession,Organization,Observation_Date
Returns a tuple consisting of a list and a dictionary.
-The list is a list of accession numbers
-The dictionary is a dictionary with accession number as key
and a tuple of (PHN, Organization, Observation Date) as
the value.
"""
ans = []
other_data = {}
with open(filename, 'r') as csvfile:
reader = csv.reader(csvfile, delimiter=',', quotechar='"')
# skip header
next(reader, None)
for line in reader:
accession_num = parse_accession_num(line[1])
ans.append(accession_num)
if accession_num not in other_data:
other_data[accession_num] = []
other_data[accession_num].append(
(line[0], line[2], parse_excelleris_date(line[3])))
return ans, other_data
def get_oscar_ans(filename):
"""
The provided filename contains a list of accession numbers from OSCAR,
one accession number per line. This can be obtained from OSCAR by doing
a query similar to the following:
SELECT accessionNum FROM hl7TextInfo WHERE ...;
Return a list of accession numbers.
"""
ans = []
with open(filename, 'r') as _file:
for line in _file.readlines():
ans.append(line.strip())
return ans
def parse_accession_num(num):
"""
Parse accession numbers from Excelleris
Accession numbers like this: 13-1542432432
get converted to: 1542432432
Accessoin numbers like this B13-234-2343253
or 23532532
get returned as is.
"""
ret = num.split('-')
if len(ret) == 2:
return ret[1]
else:
return num
def print_header(message):
"""
Print a heading message to stdout.
"""
message_len = len(message)
stars = 70 - int(message_len / 2)
print('*'*stars + ' ' + message + ' ' + '*'*stars)
def compare(ex_filename, oscar_filename):
"""
Compare accession number data from Excelleris with accession
numbers from OSCAR
The provided oscar_filename contains a list of accession numbers
from OSCAR, one accession number per line. This can be obtained
from OSCAR by doing a query similar to the following:
SELECT accessionNum FROM hl7TextInfo WHERE ...;
The provided ex_filename is a CSV file
provided by Excelleris with the following columns:
PHN,Accession,Organization,Observation_Date
"""
ex_ans, ex_d = get_excelleris_ans(ex_filename)
ex_ans_set = set(ex_ans)
print_header('Summary statistics')
print('Number of Excelleris accession No.:', len(ex_ans))
print('Number of unique Excelleris accession No.:', len(ex_ans_set))
oscar_ans = get_oscar_ans(oscar_filename)
oscar_ans_set = set(oscar_ans)
print('Number of OSCAR accession No.:', len(oscar_ans))
print('Number of unique OSCAR accession No.:', len(oscar_ans_set))
missing_ans = ex_ans_set - oscar_ans_set
print_header('Labs present in Excelleris but missing in OSCAR')
print('Number of Excelleris accession No. missing from OSCAR: %d'
% (len(missing_ans)))
print('Labs missing in OSCAR (Date, PHN, accession #, lab):')
missing_infos = []
for missing_an in missing_ans:
for extra_info in ex_d[missing_an]:
missing_info = [missing_an]
missing_info += [y for y in extra_info]
missing_infos.append(missing_info)
missing_infos.sort(key=itemgetter(3))
phns = []
fp = open('missing_accession_num.txt', 'w')
for accession_num, phn, lab, date in missing_infos:
date = format_date(date)
if len(phn.strip()) == 0:
phn = '?'*10
else:
phns.append(phn)
print(date, phn, accession_num, lab)
fp.write(accession_num + '\n')
fp.close()
print('Run the following query in OSCAR for more detail: '
+ 'SELECT demographic_no, hin, first_name, last_name, year_of_birth '
+ 'FROM demographic '
+ 'WHERE hin in (%s) ORDER BY last_name;'
% ('(' + "'" + "','".join(phns) + "')"))
missing_from_excelleris = oscar_ans_set - ex_ans_set
print_header('Labs present in OSCAR but missing in Excelleris')
print('Number of missing accession No. from Excelleris: %d'
% len(missing_from_excelleris))
print('Missing from Excelleris '
+ '(run the following query in OSCAR for more detail):')
print('SELECT * FROM hl7TextInfo WHERE accessionNum IN '
+ '(' + ','.join(missing_from_excelleris) + ');')
class Test(unittest.TestCase):
"""
Unit tests
"""
def test_parse_accession_num(self):
"""
Test the parse_accession_num function
"""
self.assertEqual('101566025',
parse_accession_num('13-101566025'))
self.assertEqual('907199',
parse_accession_num('12-907199'))
self.assertEqual('132656811',
parse_accession_num('132656811'))
self.assertEqual('29682087',
parse_accession_num('W509401-29682087'))
self.assertEqual('VS13-44255-AP3341330',
parse_accession_num('VS13-44255-AP3341330'))
def test_parse_excelleris_date(self):
"""
Test the parse_excelleris_date function
"""
self.assertEqual(datetime.datetime(2013, 7, 19, 8, 34, 0),
parse_excelleris_date('7/19/13 8:34 AM'))
self.assertEqual(datetime.datetime(2013, 9, 3, 14, 19, 0),
parse_excelleris_date('09-03-2013 14:19'))
self.assertEqual(datetime.datetime(2013, 12, 2, 0, 0, 0),
parse_excelleris_date('12-02-2013 0:00'))
def main():
"""
Parse command-line variables
"""
parser = argparse.ArgumentParser(
description='A tool to find out what Excelleris labs '
+ 'are missing from OSCAR')
parser.add_argument('excelleris_csv_file',
help='.csv file containing Excelleris data '
+ '(PHN, Accession, Organization, Observation_Date)')
parser.add_argument('oscar_accession_numbers',
help='text file containing accession numbers (one per line). '
+ 'Generating by doing: '
+ '"SELECT accessionNum FROM hl7TextInfo WHERE ...;"')
args = parser.parse_args()
compare(args.excelleris_csv_file, args.oscar_accession_numbers)
if __name__ == '__main__':
if len(sys.argv) == 1:
unittest.main()
else:
main()
PHN Accession Organization Observation_Date
9038924536 12-907198 BCB 04-03-2012 10:31
9047941536 13-101566015 LIFELABS 8/18/13 8:34 AM
9104819121 13-101600134 LIFELABS 8/21/13 9:22 AM
9084037511 13-100601079 LIFELABS 7/24/13 12:54 PM
9858325638 13-101612100 LIFELABS 08-02-2013 13:55
9049860759 13-102612108 LIFELABS 07-04-2013 14:19
9106819021 13-101615056 LIFELABS 06-06-2013 9:40
210595
211929
159026085
29693221
PS14-372-AP3244358
29681830
171025103
29663139
29633121
143115067
29671076
29472087
29572836
20011225
#!/usr/bin/env python3
import base64
import datetime
import numpy
import sys
import unittest
def get_hl7s_from_file(filename):
hl7s = []
for line in open(filename, encoding='utf-8').readlines():
line_bytes = bytes(line, 'utf-8')
decoded_bytes = base64.b64decode(line_bytes)
hl7s.append(str(decoded_bytes, 'utf-8'))
return hl7s
def get_date_from_obr(obr):
datetime_str = obr.split('|')[22]
datetime_obj = datetime.datetime.strptime(datetime_str, '%Y%m%d%H%M%S')
return datetime_obj
def get_obr_lines_from_hl7(hl7):
obr_lines = []
for line in hl7.split('\n'):
if line.find('OBR') == 0:
obr_lines.append(line)
return obr_lines
def get_dates_from_hl7(hl7):
dates = []
for obr in get_obr_lines_from_hl7(hl7):
d = get_date_from_obr(obr)
dates.append(d)
return dates
def get_dates_from_hl7s(hl7s):
dates = []
for hl7 in hl7s:
dates += get_dates_from_hl7(hl7)
return dates
def main(filename):
hl7s = get_hl7s_from_file(filename)
print("# of HL7:", len(hl7s))
dates = get_dates_from_hl7s(hl7s)
print('# of dates:', len(dates))
dow = {}
for i in range(1,8):
dow[i] = []
for date in dates:
dow[date.isoweekday()].append(date)
for i in range(1,8):
print('day of week=', i, 'number of labs=', len(dow[i]))
d = {}
for date in dates:
if date.date() not in d:
d[date.date()] = 0
d[date.date()] = d[date.date()] + 1
for date in sorted(d.keys()):
if date.isoweekday() == 1:
print("************")
if date.isoweekday() != 7:
print(date.isoweekday(), date, d[date])
class Test(unittest.TestCase):
def setUp(self):
self.hl7 = """MSH|^~\&|PATHL7|LIFELABS|HTTPCLIENT|hchang1|20130824110130||ORU^R01|CAD20130824110127689|P|2.3|||ER|AL
PID||9871327043|||EDMONDS^DARA^ANNE||19710324|F|||||(604)742-1432
ORC|RE||13-132594130-GCM-1|||||||||07810^THOMPSON^DAVID
OBR|1||13-132594130-GCM-1|GCM^General Information||20130816150300|20130816150300|||||||20130816150300||07810^THOMPSON^DAVID||||||20130816195003||GENERAL|F|||23242^CHANG^HEATHER~07810^THOMPSON^DAVID
NTE|||Additional tests requested by DR HEATHER CHANG 19-AUG-2013:\.br\FREE T4\.br\Additional tests requested by DR HEATHER CHANG 20-AUG-2013:\.br\T3 Free\.br\
ORC|RE||13-132594130-TSR-1|||||||||07810^THOMPSON^DAVID
OBR|2||13-132594130-TSR-1|TSR^TSH||20130816150300|20130816150300|||||||20130816150300||07810^THOMPSON^DAVID||||||20130816195003||CHEM11|F|||23242^CHANG^HEATHER~07810^THOMPSON^DAVID
OBX|1|NM|3016-3^TSH||0.05|mU/L|0.27-4.2|A|||F|||20130816195003
ORC|RE||13-132594130-FTR-1|||||||||07810^THOMPSON^DAVID
OBR|3||13-132594130-FTR-1|FTR^T4 Free||20130816150300|20130816150300|||||||20130816150300||07810^THOMPSON^DAVID||||||20130819205003||CHEM11|F|||23242^CHANG^HEATHER~07810^THOMPSON^DAVID
OBX|1|ST|14920-3^T4 Free||<5.0|pmol/L|10.5-20.0|A|||F|||20130819205003
ORC|RE||13-132594130-FT3-1|||||||||07810^THOMPSON^DAVID
OBR|4||13-132594130-FT3-1|FT3^T3 Free||20130816150300|20130816150300|||||||20130816150300||07810^THOMPSON^DAVID||||||20130821065003||CHEM11|F|||23242^CHANG^HEATHER~07810^THOMPSON^DAVID
OBX|1|NM|14928-6^T3 Free||8.8|pmol/L|3.5-6.5|A|||F|||20130821065003"""
def test_get_date_from_obr(self):
ret = get_date_from_obr('OBR|2||13-132594130-TSR-1|TSR^TSH||20130816150300|20130816150300|||||||20130816150300||07810^THOMPSON^DAVID||||||20130816195003||CHEM11|F|||23242^CHANG^HEATHER~07810^THOMPSON^DAVID')
self.assertEqual(datetime.datetime(2013, 8, 16, 19, 50, 3), ret)
def test_get_obr_lines_from_hl7(self):
lines = get_obr_lines_from_hl7(self.hl7)
expected = ['OBR|1||13-132594130-GCM-1|GCM^General Information||20130816150300|20130816150300|||||||20130816150300||07810^THOMPSON^DAVID||||||20130816195003||GENERAL|F|||23242^CHANG^HEATHER~07810^THOMPSON^DAVID', 'OBR|2||13-132594130-TSR-1|TSR^TSH||20130816150300|20130816150300|||||||20130816150300||07810^THOMPSON^DAVID||||||20130816195003||CHEM11|F|||23242^CHANG^HEATHER~07810^THOMPSON^DAVID', 'OBR|3||13-132594130-FTR-1|FTR^T4 Free||20130816150300|20130816150300|||||||20130816150300||07810^THOMPSON^DAVID||||||20130819205003||CHEM11|F|||23242^CHANG^HEATHER~07810^THOMPSON^DAVID', 'OBR|4||13-132594130-FT3-1|FT3^T3 Free||20130816150300|20130816150300|||||||20130816150300||07810^THOMPSON^DAVID||||||20130821065003||CHEM11|F|||23242^CHANG^HEATHER~07810^THOMPSON^DAVID']
self.assertEqual(lines, expected)
def test_get_dates_from_hl7(self):
dates = get_dates_from_hl7(self.hl7)
expected = [datetime.datetime(2013, 8, 16, 19, 50, 3), datetime.datetime(2013, 8, 16, 19, 50, 3), datetime.datetime(2013, 8, 19, 20, 50, 3), datetime.datetime(2013, 8, 21, 6, 50, 3)]
self.assertEqual(dates, expected)
if __name__ == '__main__':
if len(sys.argv) == 1:
unittest.main()
else:
filename = sys.argv[1]
main(filename)
@dgrant
Copy link
Author

dgrant commented Jan 26, 2014

Example usage:

./compare_excelleris_oscar_labs.py excelleris_labs_example.csv oscar_accession_num_example.txt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment