Created
December 17, 2020 09:49
-
-
Save sdabbour-stratio/a7acaa041725d84c1f967ced994f95b1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import functions as F, types as T, DataFrame | |
def general_converter(spark, df_table_to_convert: DataFrame) -> DataFrame: | |
""" | |
Given a dataframe (table_name) that has some columns to be auto-converted, applies the | |
corresponding conversions specified in the master table_name of the general converter. | |
:param df_table_to_convert: Spark Dataframe where a column named 'TABLA_ORIGEN' has been | |
added, which contains the name of the table, needed for later | |
joins with the parametric table of the GC. | |
:return: Spark DataFrame with the same columns as the original, but with the | |
transcodification already applied where needed. | |
""" | |
### tags | |
tag_ID_MAP = 'ID_MAP' | |
tag_TIPO_DESTINO = 'TIPO_MAPEO' | |
tag_TABLA_ORIGEN = 'TABLA_ORIGEN' | |
tag_CAMPO_ORIGEN = 'CAMPO_ORIGEN' | |
tag_VALOR_ORIGEN = 'VALOR_ORIGEN' | |
tag_TABLA_DESTINO = 'TABLA_DESTINO' | |
tag_CAMPO_DESTINO = 'CAMPO_DESTINO' | |
tag_VALOR_DESTINO = 'VALOR_DESTINO' | |
tag_IS_ACTIVE = "IS_ACTIVE" | |
tag_DATE_INSERT = "DATE_INSERT" | |
tag_DATE_EDIT = "DATE_EDIT" | |
def collect_distinct_elems_to_list(df, col: str): | |
""" DOC """ | |
return df.select(col).distinct().rdd.flatMap(lambda x: x).collect() | |
gc_columns = [tag_TABLA_ORIGEN, tag_CAMPO_ORIGEN, tag_VALOR_ORIGEN, tag_VALOR_DESTINO] | |
join_type="left" # if 'left', values not appearing in the GC table will be set to NULL | |
table_name = collect_distinct_elems_to_list(df_table_to_convert, col=tag_TABLA_ORIGEN)[0] | |
##### read parametric table of the General Converter, but filtered for our current table | |
df_GC_filtered = spark.read.format('csv').options(header='true',inferSchema='true',delimiter=',').load('/tmp/gms/GC_general_origen.csv').where(F.col(tag_TABLA_ORIGEN) == table_name) | |
print(f"> Applying the general converter to table_name: {table_name} ...") | |
fields_to_convert = collect_distinct_elems_to_list(df_GC_filtered, col=tag_CAMPO_ORIGEN) | |
print(f">> Fields to convert: {fields_to_convert}\n") | |
# DF to be modified in the loop | |
df_table = df_table_to_convert | |
for field in fields_to_convert: | |
df_table = (df_table.join(df_GC_filtered.where(F.col(tag_CAMPO_ORIGEN) == field).select(gc_columns), | |
(df_table[tag_TABLA_ORIGEN] == df_GC_filtered[tag_TABLA_ORIGEN]) & \ | |
(df_table[field] == df_GC_filtered[tag_VALOR_ORIGEN]), | |
how=join_type) | |
.drop(field).withColumnRenamed(tag_VALOR_DESTINO, field) | |
.drop(df_GC_filtered[tag_TABLA_ORIGEN]) | |
.drop(*[tag_ID_MAP, tag_CAMPO_ORIGEN, tag_VALOR_ORIGEN, tag_VALOR_DESTINO, | |
tag_TIPO_DESTINO, tag_IS_ACTIVE, tag_DATE_INSERT, tag_DATE_EDIT]) | |
) | |
df_table_converted = df_table.select(df_table_to_convert.columns) | |
return df_table_converted |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment