Skip to content

Instantly share code, notes, and snippets.

@Menziess
Created February 11, 2019 11:48
Show Gist options
  • Save Menziess/3a742af12ba89539e9df54de7633e29c to your computer and use it in GitHub Desktop.
Save Menziess/3a742af12ba89539e9df54de7633e29c to your computer and use it in GitHub Desktop.
Parquet data source does not support struct<some_field:null> data type.;
"""
The solution is to make sure that structs in the DataFrame schema are not of NullType.
"""
def replace_nulls_struct_fields(df):
"""
Convert NullType fields in structs.
"""
old_schema = df.schema
new_schema = old_schema.fromJson(
json_replace_by(old_schema.jsonValue(), 'null', 'string', ['type']))
new_df = spark.createDataFrame(df.rdd, new_schema)
return new_df
def json_replace_by(dictionary, old: str, new: str, only_keys=[]):
"""
Replace old dictionary values by new recursively if keys are in only_keys.
>>> json_replace_by({'type': 'null', 'metadata': {}}, 'null', 'string', ['type'])
{
'type': 'string',
'metadata': {}
}
"""
new_dictionary = {}
for k, v in dictionary.items():
if type(v) == list:
new_dictionary[k] = [json_replace_by(x, old, new, only_keys) for x in v]
elif type(v) == dict:
new_dictionary[k] = json_replace_by(v, old, new, only_keys)
elif not only_keys and v == old:
new_dictionary[k] = new
elif k in only_keys and v == old:
new_dictionary[k] = new
else:
new_dictionary[k] = v
return new_dictionary
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment