Skip to content

Instantly share code, notes, and snippets.

@ahoy-jon
Created September 6, 2018 09:45
Show Gist options
  • Save ahoy-jon/29dce897e7d3e7550f16711b60580df2 to your computer and use it in GitHub Desktop.
Save ahoy-jon/29dce897e7d3e7550f16711b60580df2 to your computer and use it in GitHub Desktop.
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema}
import org.apache.spark.sql.catalyst.json.{JSONOptions, JacksonParser}
import org.apache.spark.sql.catalyst.util.GenericArrayData
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
object EmptyJsonOption {
@transient lazy val katReplaySchema:StructType = DataType.fromJson(DataMainSchema.schemaKatReplay).asInstanceOf[StructType]
@transient lazy val dataMainSchema:StructType = DataType.fromJson(DataMainSchema.schemaDataMain).asInstanceOf[StructType]
@transient lazy val katReplaySchema2: StructType = mergeSchemaDonneesProduit(katReplaySchema,dataMainSchema)
def mergeSchemaDonneesProduit(katReplaySchema: StructType, donneesProduitSchema: StructType):StructType = {
def changeDonneesProduitSchema(dataType: DataType)(f:StructField => StructField):DataType =
dataType match {
case ArrayType(e,containsNull) => ArrayType(changeDonneesProduitSchema(e)(f),containsNull)
case StructType(fields) => StructType(fields.map(x => f(StructField(x.name,changeDonneesProduitSchema(x.dataType)(f),x.nullable,x.metadata))))
case x => x
}
changeDonneesProduitSchema(katReplaySchema)({
case StructField("donneesproduit", StringType, nullable, metadata) => StructField("donneesproduit", donneesProduitSchema,nullable,metadata)
case x => x
}).asInstanceOf[StructType]
}
@transient lazy val empty:JSONOptions = new JSONOptions(Map.empty[String,String])
@transient lazy val parser: JacksonParser = new JacksonParser(dataMainSchema, "", EmptyJsonOption.empty)
def internalToRow(schema:StructType,internalRow: InternalRow):Row = {
def any2any(dataType: DataType,any:Any):Any = {
(dataType,any) match {
case (_,null) => null
case (s:StructType, internalRow: InternalRow) =>
new GenericRow(s.zipWithIndex.map({case (f,i) => any2any(f.dataType,internalRow.get(i,f.dataType))}).toArray)
case (a:ArrayType, s:Seq[Any]) =>
s.map(e => any2any(a.elementType,e))
case (a:ArrayType, g:GenericArrayData) =>
g.array.map(e => any2any(a.elementType,e)).toSeq
case (StringType,s:UTF8String) => s.toString
case (BooleanType,b:Boolean) => b
case (LongType, l:Long) => l
case (DoubleType,d:Double) => d
case (x,y) => throw new Exception(s"Not handle $x, ${Option(y).map(x => (x.getClass,x))}")
}
}
any2any(schema,internalRow).asInstanceOf[Row]
}
def mergeRowDonnessProduit(row:Row):Row = {
def jsonDataToRow(jsonData:String): InternalRow = {
parser.parse(jsonData).head
}
val grow = row.asInstanceOf[GenericRowWithSchema]
grow.values.update(1,{
val grow2 = grow.values(1).asInstanceOf[GenericRowWithSchema]
grow2.values.update(9,{
val donneesproduit = grow2.values(9)
if(donneesproduit == null) null
else {
val row = jsonDataToRow(donneesproduit.asInstanceOf[String])
internalToRow(dataMainSchema,row)
}
})
grow2
})
row
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment