Skip to content

Instantly share code, notes, and snippets.

@saisgit
Created February 13, 2019 06:49
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save saisgit/b856ad11507976a6b79e6395f5de3cdc to your computer and use it in GitHub Desktop.
Save saisgit/b856ad11507976a6b79e6395f5de3cdc to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType,ArrayType}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.explode_outer
object FlattenJson extends App {
val conf = new SparkConf().setMaster("local[*]").setAppName("JSON Flattener")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val inputJson = """|{
| "name":"John",
| "age":30,
| "bike":{
| "name":"Bajaj", "models":["Dominor", "Pulsar"]
| },
| "cars": [
| { "name":"Ford", "models":[ "Fiesta", "Focus", "Mustang" ] },
| { "name":"BMW", "models":[ "320", "X3", "X5" ] },
| { "name":"Fiat", "models":[ "500", "Panda" ] }
| ]
|}""".stripMargin('|')
println(inputJson)
//creating rdd for the json
val jsonRDD = sc.parallelize(inputJson::Nil)
//creating DF for the json
val jsonDF = sqlContext.read.json(jsonRDD)
//Schema of the JSON DataFrame before Flattening
jsonDF.schema
//Output DataFrame Before Flattening
jsonDF.show(false)
//Function for exploding Array and StructType column
def flattenDataframe(df: DataFrame): DataFrame = {
val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
val length = fields.length
for(i <- 0 to fields.length-1){
val field = fields(i)
val fieldtype = field.dataType
val fieldName = field.name
fieldtype match {
case arrayType: ArrayType =>
val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
// val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*"))
val explodedDf = df.selectExpr(fieldNamesAndExplode:_*)
return flattenDataframe(explodedDf)
case structType: StructType =>
val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_"))))
val explodedf = df.select(renamedcols:_*)
return flattenDataframe(explodedf)
case _ =>
}
}
df
}
val flattendedJSON = flattenDataframe(jsonDF)
//schema of the JSON after Flattening
flattendedJSON.schema
//Output DataFrame After Flattening
flattendedJSON.show(false)
}
@jitindoriya07
Copy link

@SaiKemp Can I get help, I would need Java code for above. Please help me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment