Skip to content

Instantly share code, notes, and snippets.

@fredkingham
Last active November 8, 2022 11:26
Show Gist options
  • Save fredkingham/16749e304c4e19ce944291ebaa436888 to your computer and use it in GitHub Desktop.
Save fredkingham/16749e304c4e19ce944291ebaa436888 to your computer and use it in GitHub Desktop.
import datetime
from django.db import transaction
from collections import defaultdict
from django.utils import timezone
from elcid.utils import timing
from elcid import models
from django.conf import settings
from django.db.models import DateTimeField, DateField
from opal.core.fields import ForeignKeyOrFreeText
import csv
import pytds
from intrahospital_api.apis.prod_api import ProdApi as ProdAPI
from intrahospital_api import update_demographics
FILE_NAME = "master_file.csv"
def get_column_names():
api = ProdAPI()
query = """
SELECT TOP(1) * FROM VIEW_CRS_Patient_Masterfile WITH (NOLOCK)
"""
result = api.execute_hospital_query(query)
return result[0].keys()
def get_count():
api = ProdAPI()
query = """
SELECT count(*) FROM VIEW_CRS_Patient_Masterfile WITH (NOLOCK)
"""
result = api.execute_hospital_query(query)
return result[0][0]
ETHNICITY_MAPPING = {
"99": "Other - Not Known",
"A": "White - British",
"B": "White - Irish",
"C": "White - Any Other White Background",
"D": "Mixed - White and Black Caribbean",
"E": "Mixed - White and Black African",
"F": "Mixed - White and Asian",
"G": "Mixed - Any Other Mixed Background",
"H": "Asian or Asian British - Indian",
"J": "Asian or Asian British - Pakistani",
"K": "Asian or Asian British - Bangladeshi",
"L": "Asian - Any Other Asian Background",
"M": "Black or Black British - Caribbean",
"N": "Black or Black British - African",
"P": "Black - Any Other Black Background",
"R": "Other - Chinese",
"S": "Other - Any Other Ethnic Group",
"Z": "Other - Not Stated",
}
DEMOGRAPHICS_INFORMATION_MAPPING = {
"hospital_number": "PATIENT_NUMBER",
"nhs_number": "NHS_NUMBER",
"first_name": "FORENAME1",
"middle_name": "FORENAME2",
"surname": "SURNAME",
"title": "TITLE",
"religion": "RELIGION",
"marital_status": "MARITAL_STATUS",
"nationality": "NATIONALITY",
"main_language": "MAIN_LANGUAGE",
"date_of_birth": "DOB",
"sex": "SEX",
"ethnicity": "ETHNIC_GROUP",
"date_of_death": "DATE_OF_DEATH",
"death_indicator": "DEATH_FLAG",
}
CONTACT_INFORMATION_MAPPING = {
"address_line_1": "ADDRESS_LINE1",
"address_line_2": "ADDRESS_LINE2",
"address_line_3": "ADDRESS_LINE3",
"address_line_4": "ADDRESS_LINE4",
"postcode": "POSTCODE",
"home_telephone": "HOME_TELEPHONE",
"work_telephone": "WORK_TELEPHONE",
"mobile_telephone": "MOBILE_TELEPHONE",
"email": "EMAIL",
}
NOK_MAPPING = {
"nok_type": "NOK_TYPE",
"surname": "NOK_SURNAME",
"forename_1": "NOK_FORENAME1",
"forename_2": "NOK_FORENAME2",
"relationship": "NOK_relationship",
"address_1": "NOK_address1",
"address_2": "NOK_address2",
"address_3": "NOK_address3",
"address_4": "NOK_address4",
"postcode": "NOK_Postcode",
"home_telephone": "nok_home_telephone",
"work_telephone": "nok_work_telephone",
}
GP_DETAILS = {
"crs_gp_masterfile_id": "CRS_GP_MASTERFILE_ID",
"national_code": "GP_NATIONAL_CODE",
"practice_code": "GP_PRACTICE_CODE",
"title": "gp_title",
"initials": "GP_INITIALS",
"surname": "GP_SURNAME",
"address_1": "GP_ADDRESS1",
"address_2": "GP_ADDRESS2",
"address_3": "GP_ADDRESS3",
"address_4": "GP_ADDRESS4",
"postcode": "GP_POSTCODE",
"telephone": "GP_TELEPHONE",
}
MASTERFILE_META = {
"insert_date": "INSERT_DATE",
"last_updated": "LAST_UPDATED",
"merged": "MERGED",
"merge_comments": "MERGE_COMMENTS",
"active_inactive": "ACTIVE_INACTIVE",
}
MODEL_MAPPING = {
models.Demographics: DEMOGRAPHICS_INFORMATION_MAPPING,
models.ContactInformation: CONTACT_INFORMATION_MAPPING,
models.NextOfKinDetails: NOK_MAPPING,
models.GPDetails: GP_DETAILS,
models.MasterFileMeta: MASTERFILE_META,
}
def stream_result():
query = """
SELECT * FROM VIEW_CRS_Patient_Masterfile
WHERE Patient_Number is not null
AND Patient_Number <> ''
ORDER BY Patient_Number
"""
count = get_count()
size = 10000
iterations = int(count/size) + 1
with pytds.connect(
settings.HOSPITAL_DB["ip_address"],
settings.HOSPITAL_DB["database"],
settings.HOSPITAL_DB["username"],
settings.HOSPITAL_DB["password"],
as_dict=True
) as conn:
cnt = 0
with conn.cursor() as cur:
cur.execute(query)
while True:
cnt += 1
print(f'saving {cnt}/{iterations} at {datetime.datetime.now()}')
result = cur.fetchmany(size)
update_rows(result)
print(f'finished saving {cnt}/{iterations} at {datetime.datetime.now()}')
if not result:
break
def is_fk_or_ft(model, field):
return isinstance(getattr(model, field), ForeignKeyOrFreeText)
def is_datetime(instance, field):
model = instance.__class__
if not isinstance(getattr(model, field), ForeignKeyOrFreeText):
if isinstance(model._meta.get_field(field), DateTimeField):
return True
def is_date(instance, field):
model = instance.__class__
if not isinstance(getattr(model, field), ForeignKeyOrFreeText):
if isinstance(model._meta.get_field(field), DateField):
return True
def to_datetime(v):
formats = [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M:%S.%f"
]
formats.append(settings.DATETIME_INPUT_FORMATS[0])
formats.append(settings.DATE_INPUT_FORMATS[0])
for format in formats:
try:
dt = datetime.datetime.strptime(
v, format
)
if dt:
break
except Exception:
if format == formats[-1]:
raise
return timezone.make_aware(dt)
class FKorFTCache(object):
related_cache = defaultdict(dict)
free_text_cache = defaultdict(set)
def set(self, instance, field, value):
model = instance.__class__
related_model = getattr(model, field).foreign_model
related_api_name = related_model.get_api_name()
setattr(instance, f"{field}_fk_id", None)
setattr(instance, f"{field}_ft", None)
if value in self.related_cache[related_api_name]:
setattr(instance, f"{field}_fk_id", value)
elif value in self.free_text_cache[related_api_name]:
setattr(instance, f"{field}_ft", value)
else:
instances = related_model.objects.filter(name__iexact=value)
if instances.exists():
self.related_cache[related_api_name][value] = instances[0].id
setattr(instance, f"{field}_fk_id", value)
else:
self.free_text_cache[related_api_name].add(value)
setattr(instance, f"{field}_ft", value)
def translate_demographics(row, patient_id, cache):
demographics = models.Demographics(patient_id=patient_id)
for our_field, their_key in DEMOGRAPHICS_INFORMATION_MAPPING.items():
val = row[their_key]
if their_key == 'SEX':
if val == "M":
val = "Male"
elif val == "F":
val = "Female"
if their_key == 'ETHNIC_GROUP':
val = ETHNICITY_MAPPING.get(row["ETHNIC_GROUP"])
if their_key == "DATE_OF_DEATH":
if not val:
val = None
if their_key == "DEATH_FLAG":
if val == "ALIVE":
val = False
else:
val = True
# date of death is stored as a strin upstream
if their_key == "DATE_OF_DEATH" and val:
val = datetime.datetime.strptime(val, "%d/%m/%Y").date()
if is_datetime(demographics, our_field) and val:
val = timezone.make_aware(val)
if is_fk_or_ft(demographics, our_field):
cache.set(demographics, our_field, val)
else:
setattr(demographics, our_field, val)
return demographics
def translate_model(model, row, patient_id, cache):
instance = model(patient_id=patient_id)
for our_field, their_field in MODEL_MAPPING[model].items():
if is_fk_or_ft(instance, our_field):
cache.set(instance, our_field)
else:
setattr(instance, our_field, row[their_field])
return instance
@timing
@transaction.atomic
def update_rows(rows):
demographics = list(models.Demographics.objects.filter(
hospital_number__in=[i["PATIENT_NUMBER"] for i in rows if i["PATIENT_NUMBER"]]
))
hn_to_patient_ids = defaultdict(set)
existing_patient_ids = []
demographics_to_save = []
contact_details_to_save = []
gp_details_to_save = []
nok_details_to_save = []
master_meta_to_save = []
cache = FKorFTCache()
for d in demographics:
hn_to_patient_ids[d.hospital_number].add(d.patient_id)
existing_patient_ids.append(d.patient_id)
for row in rows:
patient_ids = list(hn_to_patient_ids.get(row["PATIENT_NUMBER"], set()))
for patient_id in patient_ids:
demographics_to_save.append(translate_demographics(row, patient_id, cache))
contact_details_to_save.append(translate_model(models.ContactInformation, row, patient_id, cache))
gp_details_to_save.append(translate_model(models.GPDetails, row, patient_id, cache))
nok_details_to_save.append(translate_model(models.NextOfKinDetails, row, patient_id, cache))
master_meta_to_save.append(translate_model(models.MasterFileMeta, row, patient_id, cache))
for model in MODEL_MAPPING.keys():
model.objects.filter(patient_id__in=existing_patient_ids).delete()
models.Demographics.objects.bulk_create(demographics_to_save)
models.ContactInformation.objects.bulk_create(contact_details_to_save)
models.GPDetails.objects.bulk_create(gp_details_to_save)
models.NextOfKinDetails.objects.bulk_create(nok_details_to_save)
models.MasterFileMeta.objects.bulk_create(master_meta_to_save)
@transaction.atomic
def delete_duplicates():
from opal.models import Patient
from django.db.models import Count
qs = Patient.objects.annotate(cnt=Count('demographics')).filter(cnt__gt=1).prefetch_related(
'demographics_set', 'contactinformation_set', 'gpdetails_set', 'nextofkindetails_set', 'masterfilemeta_set'
)[:100000]
demographics_to_delete = []
contact_information_to_delete = []
gpdetails_to_delete = []
next_of_kin_details_to_delete = []
for i in qs:
demographics_to_delete.extend(list(i.demographics_set.all())[1:])
contact_information_to_delete.extend(list(i.contactinformation_set.all())[1:])
gpdetails_to_delete.extend(list(i.gpdetails_set.all())[1:])
next_of_kin_details_to_delete.extend(list(i.nextofkindetails_set.all())[1:])
models.Demographics.objects.filter(id__in=[i.id for i in demographics_to_delete]).delete()
print(f'Deleted {len(demographics_to_delete)} demographics')
models.ContactInformation.objects.filter(id__in=[i.id for i in contact_information_to_delete]).delete()
print(f'Deleted {len(contact_information_to_delete)} contact information')
models.GPDetails.objects.filter(id__in=[i.id for i in gpdetails_to_delete]).delete()
print(f'Deleted {len(gpdetails_to_delete)} GP Details')
models.NextOfKinDetails.objects.filter(id__in=[i.id for i in next_of_kin_details_to_delete]).delete()
print(f'Deleted {len(next_of_kin_details_to_delete)} Next of kind details')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment