Skip to content

Instantly share code, notes, and snippets.

@shreyasms17
Last active August 1, 2021 08:00
Show Gist options
  • Save shreyasms17/8b8a064eb08f72e28e3565a0aadd7fc5 to your computer and use it in GitHub Desktop.
Save shreyasms17/8b8a064eb08f72e28e3565a0aadd7fc5 to your computer and use it in GitHub Desktop.
Enforcing string type in json records
from pyspark.sql.types import *
from pyspark.sql.functions import col, from_json
def is_leaf(data):
'''
Description:
This function checks if the particular field in the schema is a leaf or not.
Types not considered as a leaf : struct, array
:param data: [type: dict] a dictionary containing metadata about a field
:return leaf: [type: bool] indicates whether a given field is a leaf or not
'''
try:
if isinstance(data['type'], str):
leaf = True if data['type'] != 'struct' else False
else:
leaf = True if data['type']['type'] == 'map' else False
except:
leaf = False
finally:
return leaf
def enforce_stringtype(json_dict):
'''
Description:
This function recursively traverses the schema metadata
and changes the type of every leaf field to string
:param json_dict: [type: dict] contains metadata about the field
'''
if is_leaf(json_dict):
# change type to string
json_dict['type'] = 'string'
return
else:
if isinstance(json_dict, list):
for item in json_dict:
enforce_stringtype(item)
elif isinstance(json_dict, dict):
if isinstance(json_dict['type'], str):
enforce_stringtype(json_dict['type'])
else:
if json_dict['type']['type'] == 'array':
if isinstance(json_dict['type']['elementType'], dict):
enforce_stringtype(json_dict['type']['elementType']['fields'])
else:
# change type to string
json_dict['type'] = 'string'
return
elif json_dict['type']['type'] == 'struct':
enforce_stringtype(json_dict['type']['fields'])
#df : read data into this variable
# say the json column name is "json"
json_df = spark.read.json(df.rdd.map(lambda row: row.json))
# get schema
json_schema = json_df.schema
# get schema metadata as a dictionary
json_dict = json.loads(json_schema.json())
# enforce the string type
enforce_string(json_dict['fields'])
# convert schema back to StructType
enforced_schema = StructType.fromJson(json_dict)
# enforce the changed schema on the json
final_df = df.withColumn('type_safe_json', from_json(col('json'), enforced_schema)).select('type_safe_json')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment