from pyspark.dbutils import DBUtils
import pyspark.sql.functions as F
import pyspark
import pandas as pd
from itertools import chain
from functools import reduce
def create_text_file(list_df, storage_account, container, output_path, file_name):
### This function save multiple dataframes with different headers into one pipe delimited file (.txt file)
### Step 1: Concatenate the multiple columns dataframe into a single column dataframe, and separate each column by delimited `|`
### Step 2: Union all single column dataframes into one dataframe
### Step 3: Save final dataframe into text file
# Concatenate the multi columns into one column, then separate the different column values with `|` and create a list of dataframes
list_single_column_df = [df.select(F.concat_ws("|", *df.columns).alias("data")) for df in list_df]
# Union a list of dataframes
final_df = reduce(pyspark.sql.dataframe.DataFrame.unionByName, list_single_column_df)
# Save dataframe into text
final_df.coalesce(1).write.format("text").option("header", "false").option("compression","gzip").mode("overwrite").save(output_path)
Let's create dataframe header_record
env_mapping = {"prodfix" : "test", "qa" : "test", "prod" : "production"}
current_date = spark.range(1).select(F.date_format(F.current_date(), "yyyyMMdd").alias("current_date")).first()["current_date"]
current_time = spark.range(1).select(F.date_format(F.current_timestamp(),"HHmmss").alias("current_time")).first()["current_time"]
hc_df = pd.DataFrame({
"Sender ID": ["RXLIGHTNING"],
"File Date": [current_date],
"File Time": [current_time],
"File Type": ["ENS"],
"Environment": [env_mapping[env]],
"File Action": ["Update"]
})
header_record = spark.createDataFrame(hc_df)
Here is another dataframe trailer_record
total_record = rx_lightning.count()
tr_df = pd.DataFrame({
"Record Type": ["TRL"],
"Total Number of Detail Records": [total_record]
})
trailer_record = spark.createDataFrame(tr_df)
Let's call this function
dbutils.widgets.removeAll()
# Changed per environment
dbutils.widgets.text("env", "prodfix")
env = dbutils.widgets.get("env")
# 8. Output container
storage_account = f"{env}dseus2rxout01"
container = "rx-lightning"
current_date = spark.range(1).select(F.date_format(F.current_date(), "yyyyMMdd").alias("current_date")).first()["current_date"]
file_name = f"{current_date}_ENROLLMENTSTATUS_WAG_TO_RXLIGHTNING.txt.gz"
archives_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/archives"
sag_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/sag"
output_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/output"
# 9. Write multilple dataframes into one pipe delimited file (.txt file)
list_df = [header_record, trailer_record]
create_text_file(list_df, storage_account, container, output_path, file_name)
# 10. Move the old txt file from sag container to archives container if the file exists
txt_sag_paths = [file_name.path for file_name in dbutils.fs.ls(sag_path) if file_name.name.endswith("txt.gz")]
if txt_sag_paths:
dbutils.fs.mv(txt_sag_paths[0], archives_path)
# 11. Get the location of the CSV file that was just saved to Azure blob storage (it ends with 'csv.gz')
txt_output_paths = [file_name.path for file_name in dbutils.fs.ls(output_path) if file_name.name.endswith("txt.gz")]
dbutils.fs.mv(txt_output_paths[0], sag_path + "/" + file_name)
# 12. Remove the output folder
dbutils.fs.rm(output_path, True)
The final dataframe
RXLIGHTNING|20230714|201210|ENS|test|Update
RXLIGHTNING|20230714|201210|test|Update