Skip to content

Instantly share code, notes, and snippets.

@shreyasms17
Created March 2, 2022 15:07
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save shreyasms17/96f74e45d862f8f1dce0532442cc95b2 to your computer and use it in GitHub Desktop.
Save shreyasms17/96f74e45d862f8f1dce0532442cc95b2 to your computer and use it in GitHub Desktop.
from pyspark.sql.functions import col, explode_outer, from_json, lit, concat
from pyspark.sql.types import StructType, ArrayType
def get_json_df(input_df, primary_partition_column, json_column_name, spark_session):
'''
Description:
This function provides the schema of json records and the dataframe to be used for flattening
:param input_df: [type: pyspark.sql.dataframe.DataFrame] input dataframe
:primary_partition_column: [type: string] name of primary partition column
:param json_column_name: [type: string] name of the column with json string
:param spark_session: SparkSession object
:return df: dataframe to be used for flattening
'''
input_df = input_df if primary_partition_column is None else input_df.drop(primary_partition_column)
# creating a column transformedJSON to create an outer struct
df1 = input_df.withColumn('transformed_json', concat(lit("""{"transformedJSON" :"""), input_df[json_column_name], lit("""}""")))
json_df = spark_session.read.json(df1.rdd.map(lambda row: row.transformed_json))
# get schema
json_schema = json_df.schema
df = df1.drop(json_column_name).withColumn(json_column_name, from_json(col('transformed_json'), json_schema)).drop('transformed_json').select(f'{json_column_name}.*', '*').drop(json_column_name)
return df
def execute_autoflatten(df, json_column_name):
'''
Description:
This function executes the core autoflattening operation
:param df: [type: pyspark.sql.dataframe.DataFrame] dataframe to be used for flattening
:param json_column_name: [type: string] name of the column with json string
:return df: DataFrame containing flattened records
'''
# gets all fields of StructType or ArrayType in the nested_fields dictionary
nested_fields = dict([
(field.name, field.dataType)
for field in df.schema.fields
if isinstance(field.dataType, ArrayType) or isinstance(field.dataType, StructType)
])
# repeat until all nested_fields i.e. belonging to StructType or ArrayType are covered
while nested_fields:
# if there are any elements in the nested_fields dictionary
if nested_fields:
# get a column
column_name = list(nested_fields.keys())[0]
# if field belongs to a StructType, all child fields inside it are accessed
# and are aliased with complete path to every child field
if isinstance(nested_fields[column_name], StructType):
unnested = [col(column_name + '.' + child).alias(column_name + '>' + child) for child in [ n.name for n in nested_fields[column_name]]]
df = df.select("*", *unnested).drop(column_name)
# else, if the field belongs to an ArrayType, an explode_outer is done
elif isinstance(nested_fields[column_name], ArrayType):
df = df.withColumn(column_name, explode_outer(column_name))
# Now that df is updated, gets all fields of StructType and ArrayType in a fresh nested_fields dictionary
nested_fields = dict([
(field.name, field.dataType)
for field in df.schema.fields
if isinstance(field.dataType, ArrayType) or isinstance(field.dataType, StructType)
])
# renaming all fields extracted with json> to retain complete path to the field
for df_col_name in df.columns:
df = df.withColumnRenamed(df_col_name, df_col_name.replace("transformedJSON", json_column_name))
return df
def execute(input_df, primary_partition_column, json_column_name, spark_session):
'''
Description:
This function executes the flattening of json records in the dataframe
:param input_df: [type: pyspark.sql.dataframe.DataFrame] input dataframe
:primary_partition_column: [type: string] name of primary partition column
:param json_column_name: [type: string] name of the column with json string
:param spark_session: SparkSession object
:return unstd_df: contains flattened dataframe with unstandardized column name format
'''
json_df = get_json_df(input_df, primary_partition_column, json_column_name, spark_session)
unstd_df = execute_autoflatten(json_df, json_column_name)
return unstd_df
@WiseAlex007
Copy link

Hi Shreyas, do you have any issues (licensing, IP, copyright etc.) if I use execute_autoflatten function code somewhere else? Cheers

@malathimlt
Copy link

Hi @shreyasms17, If I am loading the final structure to the RDBMS, what would be the primary key? How to handle updates/deletes?

@burhanhusainiqvia
Copy link

what is json_column_name? Please can someone provide some additional information

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment