Skip to content

Instantly share code, notes, and snippets.

from pyspark.sql.functions import col, explode_outer, from_json, lit, concat
from pyspark.sql.types import StructType, ArrayType
def get_json_df(input_df, primary_partition_column, json_column_name, spark_session):
'''
Description:
This function provides the schema of json records and the dataframe to be used for flattening
:param input_df: [type: pyspark.sql.dataframe.DataFrame] input dataframe
@shreyasms17
shreyasms17 / enforce_string.py
Last active August 1, 2021 08:00
Enforcing string type in json records
from pyspark.sql.types import *
from pyspark.sql.functions import col, from_json
def is_leaf(data):
'''
Description:
This function checks if the particular field in the schema is a leaf or not.
Types not considered as a leaf : struct, array
:param data: [type: dict] a dictionary containing metadata about a field
@shreyasms17
shreyasms17 / execute_autoflatten_complex.py
Last active September 2, 2022 10:22
AutoFlatten Complex JSON
from pyspark.sql.functions import col, explode_outer
from pyspark.sql.types import *
from copy import deepcopy
from autoflatten import AutoFlatten
from collections import Counter
s3_path = 's3://mybucket/orders/'
df = spark.read.orc(s3_path)
json_df = spark.read.json(df.rdd.map(lambda row: row.json))
json_schema = json_df.schema
@shreyasms17
shreyasms17 / execute_autoflatten.py
Last active October 11, 2022 06:54
Execution of AutoFlatten
from pyspark.sql.functions import col, explode_outer
from copy import deepcopy
from autoflatten import AutoFlatten
s3_path = 's3://mybucket/orders/'
df = spark.read.orc(s3_path)
json_df = spark.read.json(df.rdd.map(lambda row: row.json))
json_schema = json_df.schema
af = AutoFlatten(json_schema)
@shreyasms17
shreyasms17 / compute.py
Created May 1, 2021 07:55
AutoFlatten compute
def compute(self):
'''
Description:
This function performs the required computation and gets all the resources
needed for further process of selecting and exploding fields
'''
self.unnest_dict(self.fields_in_json, '')
all_cols_in_explode_cols = set(filter(lambda x: x.startswith(tuple(self.cols_to_explode)), self.all_fields.keys()))
self.rest = set(self.all_fields.keys()).difference(all_cols_in_explode_cols)
self.structure = self.get_structure([f"json{x}" for x in list(self.cols_to_explode)])
@shreyasms17
shreyasms17 / get_bottom_to_top.py
Created May 1, 2021 07:51
AutoFlatten get_bottom_to_top
def get_bottom_to_top(self, order, all_cols_in_explode_cols):
'''
Description:
This function gets the mutually exclusive leaf fields in every array type column
:param order: [type: list] contains the fields in order in which array explode has to take place
:param all_cols_in_explode_cols: [type: set] contains all fields in array type fields
:return bottom_to_top: [type: dict] contains list of mutually exclusive leaf fields for every
array type / struct type (parent to array type) field
@shreyasms17
shreyasms17 / extract_order.py
Last active May 1, 2021 07:52
AutoFlatten extract_order
def extract_order(self, structure):
'''
Description:
This function does a BFS traversal to obtain the order in which
the array type fields are to be exploded
:param structure: [type: dict] contains the hierarchical mapping for array fields
:return order: [type: list] contains the fields in order in which array explode has to take place
'''
@shreyasms17
shreyasms17 / get_structure.py
Last active May 1, 2021 07:52
AutoFlatten get_structure
def get_structure(self, col_list):
'''
Description:
This function gets the structure to the traversal to array field in the schema
:param col_list: [type: list] contains list of fields that are to be exploded
:return structure: [type: dict] contains the hierarchical mapping for array fields
'''
structure = {'json' : {}}
@shreyasms17
shreyasms17 / unnest_dict.py
Last active May 3, 2021 05:27
AutoFlatten unnest_dict
def unnest_dict(self, json, cur_path):
'''
Description:
This function unnests the dictionaries in the json schema recursively
and maps the hierarchical path to the field to the column name when it encounters a leaf node
:param json: [type: dict] contains metadata about the field
:param cur_path: [type: str] contains hierarchical path to that field, each parent separated by a '.'
'''
if self.is_leaf(json):
@shreyasms17
shreyasms17 / is_leaf.py
Last active May 1, 2021 07:54
AutoFlatten is_leaf
def is_leaf(self, data):
'''
Description:
This function checks if the particular field in the schema is a leaf or not.
Types not considered as a leaf : struct, array
:param data: [type: dict] a dictionary containing metadata about a field
:return leaf: [type: bool] indicates whether a given field is a leaf or not
'''