Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save sdabbour-stratio/a7acaa041725d84c1f967ced994f95b1 to your computer and use it in GitHub Desktop.
Save sdabbour-stratio/a7acaa041725d84c1f967ced994f95b1 to your computer and use it in GitHub Desktop.
from pyspark.sql import functions as F, types as T, DataFrame
def general_converter(spark, df_table_to_convert: DataFrame) -> DataFrame:
"""
Given a dataframe (table_name) that has some columns to be auto-converted, applies the
corresponding conversions specified in the master table_name of the general converter.
:param df_table_to_convert: Spark Dataframe where a column named 'TABLA_ORIGEN' has been
added, which contains the name of the table, needed for later
joins with the parametric table of the GC.
:return: Spark DataFrame with the same columns as the original, but with the
transcodification already applied where needed.
"""
### tags
tag_ID_MAP = 'ID_MAP'
tag_TIPO_DESTINO = 'TIPO_MAPEO'
tag_TABLA_ORIGEN = 'TABLA_ORIGEN'
tag_CAMPO_ORIGEN = 'CAMPO_ORIGEN'
tag_VALOR_ORIGEN = 'VALOR_ORIGEN'
tag_TABLA_DESTINO = 'TABLA_DESTINO'
tag_CAMPO_DESTINO = 'CAMPO_DESTINO'
tag_VALOR_DESTINO = 'VALOR_DESTINO'
tag_IS_ACTIVE = "IS_ACTIVE"
tag_DATE_INSERT = "DATE_INSERT"
tag_DATE_EDIT = "DATE_EDIT"
def collect_distinct_elems_to_list(df, col: str):
""" DOC """
return df.select(col).distinct().rdd.flatMap(lambda x: x).collect()
gc_columns = [tag_TABLA_ORIGEN, tag_CAMPO_ORIGEN, tag_VALOR_ORIGEN, tag_VALOR_DESTINO]
join_type="left" # if 'left', values not appearing in the GC table will be set to NULL
table_name = collect_distinct_elems_to_list(df_table_to_convert, col=tag_TABLA_ORIGEN)[0]
##### read parametric table of the General Converter, but filtered for our current table
df_GC_filtered = spark.read.format('csv').options(header='true',inferSchema='true',delimiter=',').load('/tmp/gms/GC_general_origen.csv').where(F.col(tag_TABLA_ORIGEN) == table_name)
print(f"> Applying the general converter to table_name: {table_name} ...")
fields_to_convert = collect_distinct_elems_to_list(df_GC_filtered, col=tag_CAMPO_ORIGEN)
print(f">> Fields to convert: {fields_to_convert}\n")
# DF to be modified in the loop
df_table = df_table_to_convert
for field in fields_to_convert:
df_table = (df_table.join(df_GC_filtered.where(F.col(tag_CAMPO_ORIGEN) == field).select(gc_columns),
(df_table[tag_TABLA_ORIGEN] == df_GC_filtered[tag_TABLA_ORIGEN]) & \
(df_table[field] == df_GC_filtered[tag_VALOR_ORIGEN]),
how=join_type)
.drop(field).withColumnRenamed(tag_VALOR_DESTINO, field)
.drop(df_GC_filtered[tag_TABLA_ORIGEN])
.drop(*[tag_ID_MAP, tag_CAMPO_ORIGEN, tag_VALOR_ORIGEN, tag_VALOR_DESTINO,
tag_TIPO_DESTINO, tag_IS_ACTIVE, tag_DATE_INSERT, tag_DATE_EDIT])
)
df_table_converted = df_table.select(df_table_to_convert.columns)
return df_table_converted
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment