Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save dvu4/9d70d3b3b41e4d04e4b0e2292c81723f to your computer and use it in GitHub Desktop.
Save dvu4/9d70d3b3b41e4d04e4b0e2292c81723f to your computer and use it in GitHub Desktop.
from pyspark.dbutils import DBUtils
import pyspark.sql.functions as F
import pyspark
import pandas as pd
from itertools import chain
from functools import reduce


# mapping Walgreens status to RX-Lighting status and substatus
status_mapping = {
   "EN" : "Pending", 
   "PR" : "Pending", 
   "VR" : "Pending", 
   "RD" : "Pending",                       
   "SD" : "Completed",                      
   "DL" : "Cancelled",                             
   "RJ" : "Cancelled", 
   "RP" : "Pending"}

substatus_mapping = {
   "EN" : "Entered", 
   "PR" : "Printed", 
   "VR" : "Verified", 
   "RD" : "Ready-to-dispense",
   "SD" : "Sold", 
   "DL" : "Deleted", 
   "RJ" : "Rejected", 
   "RP" : "Reprinted"}

           
status_mapping_expr    = F.create_map([F.lit(x) for x in chain(*status_mapping.items())])
substatus_mapping_expr = F.create_map([F.lit(x) for x in chain(*substatus_mapping.items())])

joined_pat_pad_rxf_rxb = (
   joined_pat_pad_rxf_rxb
   .withColumn("RecordId", F.monotonically_increasing_id())
   .withColumn("EnrollmentId", F.lit(None))
   .withColumn("SubStatus", substatus_mapping_expr[rxf.fill_stat_cd])
   .withColumn("Status", status_mapping_expr[rxf.fill_stat_cd])
   .withColumn("DOB", F.date_format(F.col("pat.brth_dt"),"yyyyMMdd"))
)

# extract the enrollment status for rxlightning
rx_lightning = (
   joined_pat_pad_rxf_rxb
   .select(F.col("RecordId"), 
           F.col("EnrollmentId"), 
           F.col("pat.pat_id").alias("PatientId"), 
           F.col("pat.first_name").alias("FirstName"), 
           F.col("pat.last_name").alias("LastName"), 
           F.col("DOB"),
           F.col("pat.gndr_cd").alias("Gender"), 
           F.col("pad.zip_cd_5").alias("PostalCode"), 
           F.col("Status"), 
           F.col("SubStatus"), 
           F.col("rxf.filling_dt").alias("StatusDateTime"), 
           F.col("rxf.fill_sold_dt").alias("DateShipped"), 
           F.col("rxf.drug_name").alias("DrugName"), 
           F.col("rxb.pbr_npi").alias("Prescriber NPI"))
   .distinct()
   )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment