Created
September 6, 2018 09:45
-
-
Save ahoy-jon/29dce897e7d3e7550f16711b60580df2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.sql | |
import org.apache.spark.sql.catalyst.InternalRow | |
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} | |
import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema} | |
import org.apache.spark.sql.catalyst.json.{JSONOptions, JacksonParser} | |
import org.apache.spark.sql.catalyst.util.GenericArrayData | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.unsafe.types.UTF8String | |
object EmptyJsonOption { | |
@transient lazy val katReplaySchema:StructType = DataType.fromJson(DataMainSchema.schemaKatReplay).asInstanceOf[StructType] | |
@transient lazy val dataMainSchema:StructType = DataType.fromJson(DataMainSchema.schemaDataMain).asInstanceOf[StructType] | |
@transient lazy val katReplaySchema2: StructType = mergeSchemaDonneesProduit(katReplaySchema,dataMainSchema) | |
def mergeSchemaDonneesProduit(katReplaySchema: StructType, donneesProduitSchema: StructType):StructType = { | |
def changeDonneesProduitSchema(dataType: DataType)(f:StructField => StructField):DataType = | |
dataType match { | |
case ArrayType(e,containsNull) => ArrayType(changeDonneesProduitSchema(e)(f),containsNull) | |
case StructType(fields) => StructType(fields.map(x => f(StructField(x.name,changeDonneesProduitSchema(x.dataType)(f),x.nullable,x.metadata)))) | |
case x => x | |
} | |
changeDonneesProduitSchema(katReplaySchema)({ | |
case StructField("donneesproduit", StringType, nullable, metadata) => StructField("donneesproduit", donneesProduitSchema,nullable,metadata) | |
case x => x | |
}).asInstanceOf[StructType] | |
} | |
@transient lazy val empty:JSONOptions = new JSONOptions(Map.empty[String,String]) | |
@transient lazy val parser: JacksonParser = new JacksonParser(dataMainSchema, "", EmptyJsonOption.empty) | |
def internalToRow(schema:StructType,internalRow: InternalRow):Row = { | |
def any2any(dataType: DataType,any:Any):Any = { | |
(dataType,any) match { | |
case (_,null) => null | |
case (s:StructType, internalRow: InternalRow) => | |
new GenericRow(s.zipWithIndex.map({case (f,i) => any2any(f.dataType,internalRow.get(i,f.dataType))}).toArray) | |
case (a:ArrayType, s:Seq[Any]) => | |
s.map(e => any2any(a.elementType,e)) | |
case (a:ArrayType, g:GenericArrayData) => | |
g.array.map(e => any2any(a.elementType,e)).toSeq | |
case (StringType,s:UTF8String) => s.toString | |
case (BooleanType,b:Boolean) => b | |
case (LongType, l:Long) => l | |
case (DoubleType,d:Double) => d | |
case (x,y) => throw new Exception(s"Not handle $x, ${Option(y).map(x => (x.getClass,x))}") | |
} | |
} | |
any2any(schema,internalRow).asInstanceOf[Row] | |
} | |
def mergeRowDonnessProduit(row:Row):Row = { | |
def jsonDataToRow(jsonData:String): InternalRow = { | |
parser.parse(jsonData).head | |
} | |
val grow = row.asInstanceOf[GenericRowWithSchema] | |
grow.values.update(1,{ | |
val grow2 = grow.values(1).asInstanceOf[GenericRowWithSchema] | |
grow2.values.update(9,{ | |
val donneesproduit = grow2.values(9) | |
if(donneesproduit == null) null | |
else { | |
val row = jsonDataToRow(donneesproduit.asInstanceOf[String]) | |
internalToRow(dataMainSchema,row) | |
} | |
}) | |
grow2 | |
}) | |
row | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment