|
import pyspark.sql.functions as F |
|
from pyspark.sql.types import * |
|
|
|
import json |
|
from typing import Optional, List, Dict |
|
|
|
TYPE_MAPPER = { |
|
bool: BooleanType(), |
|
str: StringType(), |
|
int: LongType(), |
|
float: DoubleType() |
|
} |
|
|
|
def generate_schema(input_json: Dict, |
|
max_level: Optional[int] = None, |
|
stringify_fields: Optional[List[str]] = None, |
|
skip_fields: Optional[List[str]] = None |
|
) -> StructType: |
|
""" |
|
User-friendly version for _populate_struct. |
|
Given an input JSON (as a Python dictionary), returns the corresponding PySpark schema |
|
|
|
:param input_json: example of the input JSON data (represented as a Python dictionary) |
|
:param max_level: maximum levels of nested JSON to parse, beyond which values will be cast as strings |
|
:param stringify_fields: list of fields to be directly cast as strings |
|
:param skip_fields: list of field names to completely ignore parsing and omit from the schema |
|
|
|
:return: pyspark.sql.types.StructType |
|
""" |
|
level = 1 |
|
return _populate_struct(input_json, level, max_level, stringify_fields, skip_fields) |
|
|
|
|
|
def _populate_struct(input_json: Dict, |
|
level: int = 1, |
|
max_level: Optional[int] = None, |
|
stringify_fields: Optional[List[str]] = None, |
|
skip_fields: Optional[List[str]] = None |
|
) -> StructType: |
|
""" |
|
Given an input JSON (as a Python dictionary), returns the corresponding PySpark StructType |
|
|
|
:param input_json: example of the input JSON data (represented as a Python dictionary) |
|
:param level: current level within the (nested) JSON. level=1 corresponds to the top level |
|
:param max_level: maximum levels of nested JSON to parse, beyond which values will be cast as strings |
|
:param stringify_fields: list of field names to be directly cast as strings |
|
:param skip_fields: list of field names to completely ignore parsing and omit from the schema |
|
|
|
:return: pyspark.sql.types.StructType |
|
""" |
|
|
|
if not isinstance(input_json, dict): |
|
raise ValueError("invalid input JSON") |
|
if not (isinstance(level, int) and (level > 0)): |
|
raise ValueError("level must be greater than zero") |
|
if max_level and not (isinstance(max_level, int) and (max_level >= level)): |
|
raise ValueError("max_level must be greater than or equal to level (by default, level = 1)") |
|
|
|
filled_struct = StructType() |
|
nullable = True |
|
|
|
for key in input_json.keys(): |
|
if skip_fields and (key in skip_fields): |
|
continue |
|
elif (stringify_fields and (key in stringify_fields)) or (max_level and (level >= max_level)): |
|
filled_struct.add(StructField(key, StringType(), nullable)) |
|
elif isinstance(input_json[key], dict): |
|
inner_level = level + 1 |
|
inner_struct = _populate_struct(input_json[key], inner_level, max_level, stringify_fields) |
|
inner_field = StructField(key, inner_struct, nullable) |
|
filled_struct.add(inner_field) |
|
elif isinstance(input_json[key], list): |
|
inner_level = level + 1 |
|
inner_array = _populate_array(input_json[key], inner_level) |
|
inner_field = StructField(key, inner_array, nullable) |
|
filled_struct.add(inner_field) |
|
elif input_json[key] is not None: |
|
inner_type = TYPE_MAPPER[type(input_json[key])] |
|
inner_field = StructField(key, inner_type, nullable) |
|
filled_struct.add(inner_field) |
|
|
|
return filled_struct |
|
|
|
def _populate_array(input_array: List, |
|
level: int = 1 |
|
): |
|
""" |
|
Given an input Python list, returns the corresponding PySpark ArrayType |
|
:param input_array: input array data (represented as a Python list) |
|
:param level: current level within the (nested) JSON |
|
|
|
:return: pyspark.sql.types.ArrayType |
|
""" |
|
|
|
if not isinstance(input_array, list): |
|
raise ValueError("Invalid input array") |
|
if not (isinstance(level, int) and (level > 0)): |
|
raise ValueError("level must be greater than zero") |
|
|
|
if len(input_array): |
|
head = input_array[0] |
|
inner_level = level + 1 |
|
if isinstance(head, list): |
|
inner_array = _populate_array(head, inner_level) |
|
filled_array = ArrayType(inner_array) |
|
elif isinstance(head, dict): |
|
inner_struct = _populate_struct(head, inner_level) |
|
filled_array = ArrayType(inner_struct) |
|
else: |
|
inner_type = TYPE_MAPPER[type(head)] |
|
filled_array = ArrayType(inner_type) |
|
else: |
|
default_type = StringType() |
|
filled_array = ArrayType(default_type) |
|
|
|
return filled_array |