Skip to content

Instantly share code, notes, and snippets.

@shreyasms17
Last active October 11, 2022 06:54
Show Gist options
  • Save shreyasms17/47a4456abd765181a16066780f6147a3 to your computer and use it in GitHub Desktop.
Save shreyasms17/47a4456abd765181a16066780f6147a3 to your computer and use it in GitHub Desktop.
Execution of AutoFlatten
from pyspark.sql.functions import col, explode_outer
from copy import deepcopy
from autoflatten import AutoFlatten
s3_path = 's3://mybucket/orders/'
df = spark.read.orc(s3_path)
json_df = spark.read.json(df.rdd.map(lambda row: row.json))
json_schema = json_df.schema
af = AutoFlatten(json_schema)
af.compute()
df1 = json_df
visited = set([f'.{column}' for column in df1.columns])
cols_to_select = df1.columns
for rest_col in af.rest:
if rest_col not in visited:
cols_to_select += [rest_col[1:]] if af.all_fields[rest_col] not in df1.columns else [col(rest_col[1:]).alias('-'.join(rest_col[1:].split('.')[-2:]))]
visited.add(rest_col)
df1 = df1.select(cols_to_select)
if af.order:
for key in af.order:
column = key.split('.')[-1]
if af.bottom_to_top[key]:
'''
values for the column in bottom_to_top dict exists if it is an array type
'''
df1 = df1.select('*', explode_outer(col(column)).alias(f"{column}_exploded")).drop(column)
#grabbing all paths to columns after explode
cols_in_array_col = set(map(lambda x: f'{key}.{x}', df1.select(f'{column}_exploded.*').columns))
#retrieving unvisited columns
cols_to_select_set = cols_in_array_col.difference(visited)
#check done for duplicate column name & path
cols_to_select_list = list(map(lambda x: f"{column}_exploded.{x.split('.')[-1]}" if (x.split('.')[-1] not in df1.columns and f"{column}-{x.split('.')[-1]}" not in df1.columns) else col(f"{column}_exploded.{x.split('.')[-1]}").alias(f"{column}-{x.split('.')[-1]}"), list(cols_to_select_set)))
#updating visited set
visited.update(cols_to_select_set)
df1 = df1.select(df1.columns + cols_to_select_list).drop(f"{column}_exploded")
else:
'''
values for the column in bottom_to_top dict do not exist if it is a struct type
'''
#grabbing all paths to columns after opening
cols_in_array_col = set(map(lambda x: f'{key}.{x}', df1.selectExpr(f'{column}.*').columns))
#retrieving unvisited columns
cols_to_select_set = cols_in_array_col.difference(visited)
#check done for duplicate column name & path
cols_to_select_list = list(map(lambda x: f"{column}.{x.split('.')[-1]}" if (x.split('.')[-1] not in df1.columns and f"{column}-{x.split('.')[-1]}" not in df1.columns) else col(f"{column}.{x.split('.')[-1]}").alias(f"{column}-{x.split('.')[-1]}"), list(cols_to_select_set)))
#updating visited set
visited.update(cols_to_select_set)
df1 = df1.select(df1.columns + cols_to_select_list).drop(f"{column}")
final_df = df1.select(list(set(af.all_fields.values())) + [x for x in df1.columns if '-' in x])
@tooptoop4
Copy link

@shreyasms17 how do u handle a single json file with 2 lines having differing schemas?

{"glossary":{"title":"example glossary","GlossDiv":{"title":"S","GlossList":{"GlossEntry":{"ID":"SGML","SoortAs":"SGML","GlossTerm":"Standard Generalized Markup Language","Acronym":"SGML","Abbrev":"ISO 8879:1986","GlossDef":{"para":"A meta-markup language, used to create markup languages such as DocBook.","GlossSeeAlso":["GML","XML"]},"GlossSee":"markup"}}}}}

{"glossary":{"title":"exaddmple glossary","GlossCDiv":{"title":"S","GlossList":{"GlossEntry":{"ID":"SGML","SortAs":"SGML","GlossTerm":"Standard Generalized Markup Language","Acronym":"SGML","Abbrev":"ISO 8879:1986","GlossDef":{"para":"A meta-markup language, used to create markup languages such as DocBook.","GlossSeeAlso":["GML","XML"]},"GlossSee":"markup"}}}}}

@shreyasms17
Copy link
Author

shreyasms17 commented Oct 11, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment