Skip to content

Instantly share code, notes, and snippets.

@ndrpnt
Last active October 29, 2019 08:52
Show Gist options
  • Save ndrpnt/e1aa61a21db5e49220a550258c145241 to your computer and use it in GitHub Desktop.
Save ndrpnt/e1aa61a21db5e49220a550258c145241 to your computer and use it in GitHub Desktop.
[Spark 1.6] Returns a new DataFrame with nested structures flattened (and arrays exploded (one row for each element))

Before:

root
 |-- a: struct (nullable = true)
 |    |-- b: string (nullable = true)
 |    |-- c: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- d: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- e: string (nullable = true)
 |-- f: string (nullable = true)
 |-- g: array (nullable = true)
 |    |-- element: string (containsNull = true)

with explodeArrays set to false:

root                                                                            
 |-- f: string (nullable = true)
 |-- g: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- a_b: string (nullable = true)
 |-- a_c_d: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- a_c_e: array (nullable = true)
 |    |-- element: string (containsNull = true)

with explodeArrays set to true:

root
 |-- f: string (nullable = true)
 |-- g: string (nullable = true)
 |-- a_b: string (nullable = true)
 |-- a_c_d: string (nullable = true)
 |-- a_c_e: string (nullable = true)
def flatten(dataFrame: DataFrame, explodeArrays: Boolean = false): DataFrame = {
var df = dataFrame
def flatten_(path: String, dt: DataType): Unit = dt match {
case s: StructType => s.fields.foreach(f => flatten_(path + "." + f.name, f.dataType))
case a: ArrayType if explodeArrays => df = df
.withColumn(path.replace(".", "_"), explode(when(df(path).isNotNull, df(path))
.otherwise(array(lit(null).cast(a.elementType)))))
flatten_(path.replace(".", "_"), a.elementType)
case a: ArrayType => flatten_(path, a.elementType)
case _ => df = df.withColumn(path.replace(".", "_"), df(path))
}
df.schema.fields.foreach(f => flatten_(f.name, f.dataType))
df.schema.fields.filter(_.dataType.isInstanceOf[StructType]).foreach(x => df = df.drop(x.name))
df
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment