from pyspark.dbutils import DBUtils
import pyspark.sql.functions as F
import pyspark
import pandas as pd
from itertools import chain
from functools import reduce
# mapping Walgreens status to RX-Lighting status and substatus
status_mapping = {
"EN" : "Pending",
"PR" : "Pending",
"VR" : "Pending",
"RD" : "Pending",
"SD" : "Completed",
"DL" : "Cancelled",
"RJ" : "Cancelled",
"RP" : "Pending"}
substatus_mapping = {
"EN" : "Entered",
"PR" : "Printed",
"VR" : "Verified",
"RD" : "Ready-to-dispense",
"SD" : "Sold",
"DL" : "Deleted",
"RJ" : "Rejected",
"RP" : "Reprinted"}
status_mapping_expr = F.create_map([F.lit(x) for x in chain(*status_mapping.items())])
substatus_mapping_expr = F.create_map([F.lit(x) for x in chain(*substatus_mapping.items())])
joined_pat_pad_rxf_rxb = (
joined_pat_pad_rxf_rxb
.withColumn("RecordId", F.monotonically_increasing_id())
.withColumn("EnrollmentId", F.lit(None))
.withColumn("SubStatus", substatus_mapping_expr[rxf.fill_stat_cd])
.withColumn("Status", status_mapping_expr[rxf.fill_stat_cd])
.withColumn("DOB", F.date_format(F.col("pat.brth_dt"),"yyyyMMdd"))
)
# extract the enrollment status for rxlightning
rx_lightning = (
joined_pat_pad_rxf_rxb
.select(F.col("RecordId"),
F.col("EnrollmentId"),
F.col("pat.pat_id").alias("PatientId"),
F.col("pat.first_name").alias("FirstName"),
F.col("pat.last_name").alias("LastName"),
F.col("DOB"),
F.col("pat.gndr_cd").alias("Gender"),
F.col("pad.zip_cd_5").alias("PostalCode"),
F.col("Status"),
F.col("SubStatus"),
F.col("rxf.filling_dt").alias("StatusDateTime"),
F.col("rxf.fill_sold_dt").alias("DateShipped"),
F.col("rxf.drug_name").alias("DrugName"),
F.col("rxb.pbr_npi").alias("Prescriber NPI"))
.distinct()
)
Last active
July 21, 2023 18:59
-
-
Save dvu4/9d70d3b3b41e4d04e4b0e2292c81723f to your computer and use it in GitHub Desktop.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment