Skip to content

Instantly share code, notes, and snippets.

@DGrady
Last active October 16, 2019 16:00
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DGrady/b7e7ff3a80d7ee16b168eb84603f5599 to your computer and use it in GitHub Desktop.
Save DGrady/b7e7ff3a80d7ee16b168eb84603f5599 to your computer and use it in GitHub Desktop.
Flatten a Spark DataFrame schema
"""
The schemas that Spark produces for DataFrames are typically
nested, and these nested schemas are quite difficult to work with
interactively. In many cases, it's possible to flatten a schema
into a single level of column names.
"""
import typing as T
import cytoolz.curried as tz
import pyspark
def schema_to_columns(schema: pyspark.sql.types.StructType) -> T.List[T.List[str]]:
"""
Produce a flat list of column specs from a possibly nested DataFrame schema
"""
columns = list()
def helper(schm: pyspark.sql.types.StructType, prefix: list = None):
if prefix is None:
prefix = list()
for item in schm.fields:
if isinstance(item.dataType, pyspark.sql.types.StructType):
helper(item.dataType, prefix + [item.name])
else:
columns.append(prefix + [item.name])
helper(schema)
return columns
def flatten_frame(frame: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
aliased_columns = list()
for col_spec in schema_to_columns(frame.schema):
c = tz.get_in(col_spec, frame)
if len(col_spec) == 1:
aliased_columns.append(c)
else:
aliased_columns.append(c.alias(':'.join(col_spec)))
return frame.select(aliased_columns)
@ashahjee
Copy link

Excellent ... this is what I was looking for.

@ashahjee
Copy link

ashahjee commented Feb 9, 2018

Code is working fine for StructType. Is there a way to handle ArrayType also in the same code?

@neurobug
Copy link

neurobug commented Mar 9, 2018

Thank you so much!!

@nguyenvulebinh
Copy link

I modified @DGrady script to flat all array and struct type:
https://gist.github.com/nguyenvulebinh/794c296b1133feb80e46e812ef50f7fc

@ayushbij27
Copy link

I am getting this error
SyntaxError: invalid syntax
File "", line 7
def schema_to_columns(schema: pyspark.sql.types.StructType) -> T.List[T.List[str]]:
^
SyntaxError: invalid syntax

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment