Skip to content

Instantly share code, notes, and snippets.

@vijayanandrp
Created June 23, 2021 05:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vijayanandrp/79d0f05cdb4fdff3768336c11515c7cf to your computer and use it in GitHub Desktop.
Save vijayanandrp/79d0f05cdb4fdff3768336c11515c7cf to your computer and use it in GitHub Desktop.
Pyspark - explode losing without null values
from pyspark.sql.functions import *
def flatten_df(nested_df):
flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = nested_df.select(flat_cols +
[col(nc + '.' + c).alias(nc + '_' + c)
for nc in nested_cols
for c in nested_df.select(nc + '.*').columns])
print("flatten_df_count :", flat_df.count())
return flat_df
def explode_df(nested_df):
flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct' and c[1][:5] != 'array']
array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
for array_col in array_cols:
schema = new_df.select(array_col).dtypes[0][1]
nested_df = nested_df.withColumn(array_col, when(col(array_col).isNotNull(), col(array_col)).otherwise(array(lit(None)).cast(schema)))
nested_df = nested_df.withColumn("tmp", arrays_zip(*array_cols)).withColumn("tmp", explode("tmp")).select([col("tmp."+c).alias(c) for c in array_cols] + flat_cols)
print("explode_dfs_count :", nested_df.count())
return nested_df
new_df = flatten_df(myDf)
while True:
array_cols = [c[0] for c in new_df.dtypes if c[1][:5] == 'array']
if len(array_cols):
new_df = flatten_df(explode_df(new_df))
else:
break
new_df.printSchema()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment