Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save dvu4/edeff90d64021525fc06f19cecbde340 to your computer and use it in GitHub Desktop.
Save dvu4/edeff90d64021525fc06f19cecbde340 to your computer and use it in GitHub Desktop.
This function save multiple dataframes with different headers into one pipe delimited file (.txt file)
from pyspark.dbutils import DBUtils
import pyspark.sql.functions as F
import pyspark
import pandas as pd
from itertools import chain
from functools import reduce


def create_text_file(list_df, storage_account, container, output_path, file_name):
    ### This function save multiple dataframes with different headers into one pipe delimited file (.txt file)
    ### Step 1: Concatenate the multiple columns dataframe into a single column dataframe, and separate each column by delimited `|`
    ### Step 2: Union all single column dataframes into one dataframe
    ### Step 3: Save final dataframe into text file

    # Concatenate the multi columns into one column, then separate the different column values with `|` and create a list of dataframes
    list_single_column_df = [df.select(F.concat_ws("|", *df.columns).alias("data")) for df in list_df]

    # Union a list of dataframes
    final_df = reduce(pyspark.sql.dataframe.DataFrame.unionByName, list_single_column_df)

    # Save dataframe into text 
    final_df.coalesce(1).write.format("text").option("header", "false").option("compression","gzip").mode("overwrite").save(output_path)

Let's create dataframe header_record

env_mapping = {"prodfix" :  "test", "qa" :  "test", "prod" : "production"}
current_date = spark.range(1).select(F.date_format(F.current_date(), "yyyyMMdd").alias("current_date")).first()["current_date"]
current_time = spark.range(1).select(F.date_format(F.current_timestamp(),"HHmmss").alias("current_time")).first()["current_time"]
hc_df = pd.DataFrame({
    "Sender ID": ["RXLIGHTNING"],
     "File Date": [current_date], 
     "File Time": [current_time], 
     "File Type": ["ENS"], 
     "Environment": [env_mapping[env]], 
     "File Action": ["Update"]
})

header_record = spark.createDataFrame(hc_df)

Here is another dataframe trailer_record

total_record = rx_lightning.count()
tr_df = pd.DataFrame({
    "Record Type": ["TRL"],
     "Total Number of Detail Records": [total_record]
})

trailer_record = spark.createDataFrame(tr_df)

Let's call this function

dbutils.widgets.removeAll()
# Changed per environment
dbutils.widgets.text("env", "prodfix")
env = dbutils.widgets.get("env")


# 8. Output container
storage_account =  f"{env}dseus2rxout01"
container = "rx-lightning"

current_date = spark.range(1).select(F.date_format(F.current_date(), "yyyyMMdd").alias("current_date")).first()["current_date"]
file_name = f"{current_date}_ENROLLMENTSTATUS_WAG_TO_RXLIGHTNING.txt.gz"

archives_path  = f"abfss://{container}@{storage_account}.dfs.core.windows.net/archives"
sag_path       = f"abfss://{container}@{storage_account}.dfs.core.windows.net/sag"
output_path    = f"abfss://{container}@{storage_account}.dfs.core.windows.net/output"

# 9. Write multilple dataframes into one pipe delimited file (.txt file)
list_df = [header_record, trailer_record]

create_text_file(list_df, storage_account, container, output_path, file_name)

# 10. Move the old txt file from sag container to archives container if the file exists
txt_sag_paths = [file_name.path for file_name in dbutils.fs.ls(sag_path) if file_name.name.endswith("txt.gz")]
if txt_sag_paths:
    dbutils.fs.mv(txt_sag_paths[0], archives_path)

# 11. Get the location of the CSV file that was just saved to Azure blob storage (it ends with 'csv.gz')
txt_output_paths = [file_name.path for file_name in dbutils.fs.ls(output_path) if file_name.name.endswith("txt.gz")]
dbutils.fs.mv(txt_output_paths[0], sag_path + "/" + file_name)

# 12. Remove the output folder 
dbutils.fs.rm(output_path, True)

The final dataframe

RXLIGHTNING|20230714|201210|ENS|test|Update
RXLIGHTNING|20230714|201210|test|Update
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment