Skip to content

Instantly share code, notes, and snippets.

@dineshdharme
Created April 8, 2024 13:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dineshdharme/553b01316e1df6609ef3f5c9280a037e to your computer and use it in GitHub Desktop.
Save dineshdharme/553b01316e1df6609ef3f5c9280a037e to your computer and use it in GitHub Desktop.
Dynamic Json Formatting in Pyspark using schema_of_json function.
https://stackoverflow.com/questions/78290764/flatten-dynamic-json-payload-string-using-pyspark/
There is a nifty method `schema_of_json` in pyspark which derives the schema of json string and applies to the whole column.
So the following method to handly dynamic json payloads is as follows:
- First take `json_payload` of first row of dataframe
- Create a schema of the json_payload using `schema_of_json`
- Then if all rows are correctly parsed, there would be no `null` value.
If there is a null value, it means those rows haven't been parsed correctly.
So we will conver those rows back to string. Then using `.contains` we will
check if `null` string is present in the string column.
- This way we will have two dataframes. One dataframe will have correctly
parsed values. Another dataframe will have incorrectly parsed values.
- Now we will repeat the process over incorrectly parsed dataframe.
Here's a sample script with custom data.
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, StructType, StructField
spark = SparkSession.builder \
.appName("JsonPayloadDataFrame") \
.getOrCreate()
schema = StructType([
StructField("json_payload", StringType(), True)
])
data = [
Row(json_payload='{"id": 1, "name": "Alice"}'),
Row(json_payload='{"id": 2, "tags": ["spark", "python"], "active": true}'),
Row(json_payload='{"id": 3, "details": {"age": 30, "location": "New York"}}'),
Row(json_payload='{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}')
]
df = spark.createDataFrame(data, schema=schema)
df.printSchema()
df.show(n=30, truncate=False)
wanted_df_list = []
df_looped = df
while not df_looped.isEmpty():
print("USING SCHEMA FROM ONE ROW")
current_string_repr = df_looped.limit(1).select(col("json_payload")).rdd.map(lambda x: x["json_payload"]).collect()[0]
print(f"{current_string_repr=}")
df_parsed = df_looped.withColumn("parsed_struct", from_json(col("json_payload"), schema_of_json(current_string_repr)))
df_parsed.show(n=30, truncate=False)
df_parsed.printSchema()
df_null_check = df_parsed.withColumn("null_present_str", col("parsed_struct").cast(StringType()))
df_null_check.show(n=30, truncate=False)
df_null_check.printSchema()
df_null_present = df_null_check.withColumn("null_present_bool", col("null_present_str").contains("null"))
df_null_present.show(n=30, truncate=False)
df_null_present.printSchema()
df_partial_correct = df_null_present.filter(col("null_present_bool") == False)
df_partial_incorrect = df_null_present.filter(col("null_present_bool") == True)
df_partial_correct.cache().show(n=30, truncate=False)
df_partial_correct.printSchema()
df_partial_incorrect.cache().show(n=30, truncate=False)
df_partial_incorrect.printSchema()
wanted_df_list.append(df_partial_correct)
df_looped = df_partial_incorrect
for df_ele in wanted_df_list:
df_ele.show(n=30, truncate=False)
Final Output :
+--------------------------+-------------+----------------+-----------------+
|json_payload |parsed_struct|null_present_str|null_present_bool|
+--------------------------+-------------+----------------+-----------------+
|{"id": 1, "name": "Alice"}|{1, Alice} |{1, Alice} |false |
+--------------------------+-------------+----------------+-----------------+
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false |
+---------------------------------------------------------+-------------------+-------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
Full Output :
USING SCHEMA FROM ONE ROW
current_string_repr='{"id": 1, "name": "Alice"}'
+-------------------------------------------------------------------------------------------------------------+-------------+
|json_payload |parsed_struct|
+-------------------------------------------------------------------------------------------------------------+-------------+
|{"id": 1, "name": "Alice"} |{1, Alice} |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |
+-------------------------------------------------------------------------------------------------------------+-------------+
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+
|json_payload |parsed_struct|null_present_str|
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+
|{"id": 1, "name": "Alice"} |{1, Alice} |{1, Alice} |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |{2, null} |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |{3, null} |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |{4, null} |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|json_payload |parsed_struct|null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|{"id": 1, "name": "Alice"} |{1, Alice} |{1, Alice} |false |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |{2, null} |true |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |{3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |{4, null} |true |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
+--------------------------+-------------+----------------+-----------------+
|json_payload |parsed_struct|null_present_str|null_present_bool|
+--------------------------+-------------+----------------+-----------------+
|{"id": 1, "name": "Alice"}|{1, Alice} |{1, Alice} |false |
+--------------------------+-------------+----------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|json_payload |parsed_struct|null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |{2, null} |true |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |{3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |{4, null} |true |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
USING SCHEMA FROM ONE ROW
current_string_repr='{"id": 2, "tags": ["spark", "python"], "active": true}'
+-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+
|json_payload |parsed_struct |null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true} |{true, 2, [spark, python]}|{2, null} |true |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL} |{3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL} |{4, null} |true |
+-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true} |{true, 2, [spark, python]}|{true, 2, [spark, python]}|true |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL} |{null, 3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL} |{null, 4, null} |true |
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true} |{true, 2, [spark, python]}|{true, 2, [spark, python]}|false |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL} |{null, 3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL} |{null, 4, null} |true |
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+
|json_payload |parsed_struct |null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL}|{null, 3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL}|{null, 4, null} |true |
+-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+
USING SCHEMA FROM ONE ROW
current_string_repr='{"id": 3, "details": {"age": 30, "location": "New York"}}'
+-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+
|json_payload |parsed_struct |null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{{30, New York}, 3}|{null, 3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4, null} |true |
+-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{{30, New York}, 3}|{{30, New York}, 3}|true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4} |true |
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{{30, New York}, 3}|{{30, New York}, 3}|false |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4} |true |
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false |
+---------------------------------------------------------+-------------------+-------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|json_payload |parsed_struct|null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4} |true |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
USING SCHEMA FROM ONE ROW
current_string_repr='{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}'
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+
|json_payload |parsed_struct |null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{null, 4} |true |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|true |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
+------------+-------------+----------------+-----------------+
|json_payload|parsed_struct|null_present_str|null_present_bool|
+------------+-------------+----------------+-----------------+
+------------+-------------+----------------+-----------------+
+--------------------------+-------------+----------------+-----------------+
|json_payload |parsed_struct|null_present_str|null_present_bool|
+--------------------------+-------------+----------------+-----------------+
|{"id": 1, "name": "Alice"}|{1, Alice} |{1, Alice} |false |
+--------------------------+-------------+----------------+-----------------+
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false |
+---------------------------------------------------------+-------------------+-------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment