Skip to content

Instantly share code, notes, and snippets.

@fredkingham
Last active October 27, 2022 09:37
Show Gist options
  • Save fredkingham/404de313c7b49bd8eb6a880bc5fda848 to your computer and use it in GitHub Desktop.
Save fredkingham/404de313c7b49bd8eb6a880bc5fda848 to your computer and use it in GitHub Desktop.
import datetime
from collections import defaultdict
from elcid.models import Demographics
from django.db import transaction
from django.db.models import DateTimeField, Count, BooleanField, DateField
from django.utils import timezone
from plugins.admissions.models import TransferHistory
from django.conf import settings
from django.contrib.auth.models import User
from plugins.ipc.episode_categories import IPCEpisode
from plugins.ipc.models import IPCStatus
import csv
import pytds
from intrahospital_api.apis.prod_api import ProdApi as ProdAPI
from opal.models import Patient, Episode
from elcid.episode_categories import InfectionService
from intrahospital_api import update_demographics
from plugins.admissions import loader
from django.core.management import call_command
FILE_NAME = "transfer_history.csv"
def get_count():
api = ProdAPI()
query = """
SELECT COUNT(*) FROM INP.TRANSFER_HISTORY_EL_CID WITH (NOLOCK)
WHERE LOCAL_PATIENT_IDENTIFIER is not null
AND LOCAL_PATIENT_IDENTIFIER <> ''
"""
result = api.execute_warehouse_query(query)
return result[0]
def get_column_names():
api = ProdAPI()
query = """
SELECT TOP(1) * FROM INP.TRANSFER_HISTORY_EL_CID WITH (NOLOCK)
"""
result = api.execute_warehouse_query(query)
return result[0].keys()
def stream_result():
query = """
SELECT * FROM INP.TRANSFER_HISTORY_EL_CID WITH (NOLOCK)
WHERE LOCAL_PATIENT_IDENTIFIER is not null
AND LOCAL_PATIENT_IDENTIFIER <> ''
ORDER BY LOCAL_PATIENT_IDENTIFIER, ENCNTR_SLICE_ID
"""
with open(FILE_NAME, 'w') as csv_file:
headers = get_column_names()
writer = csv.DictWriter(csv_file, fieldnames=headers)
writer.writeheader()
with pytds.connect(
settings.WAREHOUSE_DB["ip_address"],
settings.WAREHOUSE_DB["database"],
settings.WAREHOUSE_DB["username"],
settings.WAREHOUSE_DB["password"],
as_dict=True
) as conn:
with conn.cursor() as cur:
cur.execute(query)
while True:
result = cur.fetchmany()
if result:
for row in result:
writer.writerow(row)
if not result:
break
def to_datetime(v):
formats = [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M:%S.%f"
]
for format in formats:
try:
dt = datetime.datetime.strptime(
v, format
)
if dt:
break
except Exception:
if format == formats[-1]:
import ipdb; ipdb.set_trace()
raise
return timezone.make_aware(dt)
def to_transfer_history(row):
transfer_history = TransferHistory()
for k, v in row.items():
field = TransferHistory.UPSTREAM_FIELDS_TO_MODEL_FIELDS.get(k)
if field:
if isinstance(TransferHistory._meta.get_field(field), DateTimeField):
v = to_datetime(v)
setattr(transfer_history, field, v)
return transfer_history
def csv_length():
with open(FILE_NAME) as csv_file:
reader = list(csv.DictReader(csv_file))
return len(reader)
@transaction.atomic
def write_transfer_histories():
with open(FILE_NAME) as csv_file:
reader = list(csv.DictReader(csv_file))
rows = []
for idx, row in enumerate(reader):
if len(rows) == 5000:
TransferHistory.objects.bulk_create(rows)
rows = []
print(f'created th {idx}/{len(reader)}')
rows.append(to_transfer_history(row))
if len(rows):
TransferHistory.objects.bulk_create(rows)
# @transaction.atomic
# def update_transfer_histories():
# for idx in range(0, TransferHistory.objects.all().count(), 5000):
# print(f'updating at {idx} to {idx+5000}')
# transfer_histories = TransferHistory.objects.all()[idx:idx+5000]
# demos = Demographics.objects.filter(
# hospital_number__in=set([i.mrn for i in transfer_histories])
# ).select_related('patient')
# hn_to_patient = {
# demo.hospital_number: demo.patient for demo in demos
# }
# for transfer_history in transfer_histories:
# patient = hn_to_patient.get(transfer_history.mrn)
# if patient:
# transfer_history.patient = patient
# TransferHistory.objects.bulk_update(transfer_histories, ["patient"])
def get_mrns_for_new_patients():
mrns = list(set(TransferHistory.objects.exclude(mrn=None).exclude(mrn="").filter(
patient=None
).values_list('mrn', flat=True).distinct()))
existing_mrns = set(Demographics.objects.all().values_list('hospital_number', flat=True))
return list(set(mrns) - existing_mrns)
def attach_patients():
transfer_histories = TransferHistory.objects.exclude(
mrn=None
).exclude(mrn="").filter(patient=None)
to_update = []
cnt = transfer_histories.count()
demo_to_patient_id = {}
print('building demographics index')
for i in Demographics.objects.all():
if i.hospital_number not in demo_to_patient_id:
demo_to_patient_id[i.hospital_number] = i.patient_id
print('finished building demographics index')
for idx, transfer_history in enumerate(transfer_histories):
patient_id = demo_to_patient_id.get(transfer_history.mrn)
if not patient_id:
continue
transfer_history.patient_id = demo_to_patient_id.get(transfer_history.mrn)
to_update.append(transfer_history)
if len(to_update) > 1000:
print('starting update')
TransferHistory.objects.bulk_update(to_update, ['patient_id'])
to_update = []
print(f'updating {idx}/{cnt}')
TransferHistory.objects.bulk_update(to_update, ['patient_id'])
@transaction.atomic
def bulk_load_patients(mrns, episode_categories=None, loading_functions=None):
if episode_categories is None:
episode_categories = [InfectionService]
if loading_functions is None:
loading_functions = [update_demographics.update_patient_information]
patients = Patient.objects.bulk_create([Patient() for patient in range(len(mrns))])
print('created patients')
demographics_set = []
for idx, mrn in enumerate(mrns):
demographics_set.append(
Demographics(patient=patients[idx], hospital_number=mrn)
)
Demographics.objects.bulk_create(demographics_set)
print('created demographics')
episodes = []
for patient in patients:
for episode_category in episode_categories:
episodes.append(
Episode(patient=patient, category_name=episode_category.display_name)
)
Episode.objects.bulk_create(episodes)
print('created episodes')
call_command('create_singletons')
print('created singletons')
for loading in loading_functions:
for idx, patient in enumerate(patients):
print(f'running {loading.__name__} for {idx}/{len(patients)}')
loading(patient)
print('done')
def get_new_ipc_patients():
ipc_mrns_query = "SELECT DISTINCT(Patient_Number) FROM ElCid_Infection_Prevention_Control_View"
api = ProdAPI()
result = [
i["Patient_Number"] for i in api.execute_hospital_query(ipc_mrns_query)
]
our_hns = set(Demographics.objects.values_list('hospital_number', flat=True).distinct())
return [i for i in result if i not in our_hns]
def check_for_dups():
ths = list(TransferHistory.objects.all())
dups = []
for th_1 in ths:
for th_2 in ths:
if th_1.id == th_2.id:
continue
if transfer_history_equality(th_1, th_2):
dups.append((th_1, th_2,))
ths = ths[1:]
return dups
def transfer_history_equality(th_1, th_2):
equal = True
for k in list(th_1.UPSTREAM_FIELDS_TO_MODEL_FIELDS.values()):
if not getattr(th_1, k) == getattr(th_2, k):
equal = False
return equal
@transaction.atomic
def create_ipc_status():
MAPPING = {
"Acinetobacter": "acinetobacter",
"Acinetobacter_LPS_Date": "acinetobacter_date",
"CJD": "cjd",
"CJD_LPS_Date": "cjd_date",
"C_DIFFICILE": "c_difficile",
"C_DIFFICILE_LPS_Date": "c_difficile_date",
"Candida_auris": "candida_auris",
"Candida_auris_date": "candida_auris_date",
"Carb_resistance": "carb_resistance",
"Carb_resistance_LPS_Date": "carb_resistance_date",
"Contact_of_Acinetobacter": "contact_of_acinetobacter",
"Contact_of_Acinetobacter_LPS_Date": "contact_of_acinetobacter_date",
"Contact_of_Candida_auris": "contact_of_candida_auris",
"Contact_of_Candida_auris_date": "contact_of_candida_auris_date",
"Contact_of_Carb_resistance": "contact_of_carb_resistance",
"Contact_of_Carb_resistance_LPS_Date": "contact_of_carb_resistance_date",
"Contact_of_Covid19": "contact_of_covid_19",
"Contact_of_Covid19_Date": "contact_of_covid_19_date",
"Covid19": "covid_19",
"Covid19_Date": "covid_19_date",
"MRSA": "mrsa",
"MRSA_LPS_Date": "mrsa_date",
"MRSA_neg": "mrsa_neg",
"MRSA_neg_LPS_Date": "mrsa_neg_date",
"Multi_drug_resistant_organism": "multi_drug_resistant_organism",
"Multi_drug_resistant_organism_date": "multi_drug_resistant_organism_date",
"Other_Date": "other_date",
"Other_Type": "other",
"Reactive": "reactive",
"Reactive_LPS_Date": "reactive_date",
"VRE": "vre",
"VRE_LPS_Date": "vre_date",
"VRE_Neg": "vre_neg",
"VRE_Neg_LPS_Date": "vre_neg_date",
"Comment": "comments"
}
api = ProdAPI()
updated_by = User.objects.filter(username='ohc').first()
QUERY = """
SELECT * FROM ElCid_Infection_Prevention_Control_View
"""
upstream_result = api.execute_hospital_query(QUERY)
print(f'completed query loading {len(upstream_result)} rows')
upstream_mrns = list(set([i["Patient_Number"] for i in upstream_result]))
IPCStatus.objects.filter(patient__demographics__hospital_number=upstream_mrns).delete()
new_statuses = []
demos = Demographics.objects.filter(hospital_number__in=upstream_mrns).select_related('patient')
hn_to_patients = defaultdict(list)
for demo in demos:
hn_to_patients[demo.hospital_number].append(demo.patient)
for idx, row in enumerate(upstream_result):
print(f'loading ipc_status for {idx}/{len(upstream_result)}')
patients = hn_to_patients[row['Patient_Number']]
patient = patients[0]
status = IPCStatus(patient=patient)
update_dict = {v: row[k] for k, v in MAPPING.items()}
status.created_by_id = updated_by.id
for key, value in update_dict.items():
if isinstance(IPCStatus._meta.get_field(key), DateField):
if value == '':
value = None
elif isinstance(value, str):
value = datetime.datetime.strptime(value, '%d/%m/%Y').date()
if isinstance(IPCStatus._meta.get_field(key), BooleanField):
if value:
value = True
else:
value = False
setattr(status, key, value)
new_statuses.append(status)
for patient in patients[1:]:
new_statuses.append(status)
IPCStatus.objects.bulk_create(new_statuses)
def load_transfer_history_for_mrn(mrn):
api = ProdAPI()
Q_GET_TRANSFERS_FOR_MRN = """
SELECT *
FROM INP.TRANSFER_HISTORY_EL_CID WITH (NOLOCK)
WHERE
LOCAL_PATIENT_IDENTIFIER = @mrn
"""
return api.execute_warehouse_query(
Q_GET_TRANSFERS_FOR_MRN, params={"mrn": mrn}
)
def missing_patients():
api = ProdAPI()
query = """
SELECT distinct(LOCAL_PATIENT_IDENTIFIER) FROM INP.TRANSFER_HISTORY_EL_CID WITH (NOLOCK)
"""
result = set([i["LOCAL_PATIENT_IDENTIFIER"] for i in api.execute_warehouse_query(query)])
return filter_new_mrns(result)
def filter_new_mrns(mrns):
existing_mrns = set(Demographics.objects.all().values_list('hospital_number', flat=True).distinct())
return list(set(mrns) - existing_mrns)
def delete_duplicates():
dups = Demographics.objects.values('hospital_number').annotate(cnt=Count('id')).filter(cnt__gt=1)
dups = [i["hospital_number"] for i in dups if i["hospital_number"]]
for dup in dups:
demos = Demographics.objects.filter(hospital_number=dup)
d = demos.exclude(id=demos.first().id)
d.delete()
def main():
print('delete duplicates')
delete_duplicates()
print('writing upstream results to file')
stream_result()
print('deleting transfer histories')
TransferHistory.objects.all().delete()
print('write transfer histories')
write_transfer_histories()
print('update transfer histories with the patients in our system')
attach_patients()
print('create patients for transfer histories with new patients and attatch them')
mrns = get_mrns_for_new_patients()
bulk_load_patients(mrns)
attach_patients()
loader.load_bed_status()
bulk_load_patients(
get_new_ipc_patients(),
episode_categories=[InfectionService, IPCEpisode]
)
create_ipc_status()
call_command('fetch_transfer_histories')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment