Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sbcd90/c6cb2c6f6ef89aa727b3 to your computer and use it in GitHub Desktop.
Save sbcd90/c6cb2c6f6ef89aa727b3 to your computer and use it in GitHub Desktop.
Spark-Logical-Plan-Serializer-Deserializer
package org.apache.spark.sql
import java.io.ByteArrayOutputStream
import com.esotericsoftware.kryo.io.Input
import org.apache.hadoop.io.{NullWritable, BytesWritable}
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.{SparkContext, SparkConf}
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.ClassTag
case class LogicalPlanRDD(val logicalPlan: LogicalPlan)
object MainApplication {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("Logical Plan serializer")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val keyValues1 = Seq(("one", 1), ("two", 2), ("three", 3), ("one", 5))
sqlContext.createDataFrame(keyValues1).toDF("k1", "v1").registerTempTable("df1")
val keyValues2 = Seq(("two", 1), ("three", 2), ("four", 3), ("five", 5))
sqlContext.createDataFrame(keyValues2).toDF("k2", "v2").registerTempTable("df2")
val df = sqlContext.sql("Select a.k1,a.v1, b.v2 from df1 as a LEFT OUTER JOIN df2 as b on a.k1 = b.k2")
println("Before serialization")
df.take(2).foreach(println)
val data = Array(df.logicalPlan)
val rdd = sc.parallelize(data).map[LogicalPlanRDD](x => LogicalPlanRDD(x))
KryoSerializerDeserializer().saveAsObjectFile[LogicalPlanRDD](rdd, "C:\\Users\\i076326\\Documents\\programs\\df_plan\\rdd")
val deserializedRDD = KryoSerializerDeserializer().objectFile[LogicalPlanRDD](sc, "C:\\Users\\i076326\\Documents\\programs\\df_plan\\rdd")
val logicalPlan = deserializedRDD.collect()(0).logicalPlan
val newDf = DataFrame(sqlContext, logicalPlan)
println("After serialization")
df.take(2).foreach(println)
}
}
class KryoSerializerDeserializer {
def saveAsObjectFile[A : ClassTag](rdd : RDD[A], path : String): Unit = {
val kryoSerializer = new KryoSerializer(rdd.context.getConf)
rdd.mapPartitions(iter => iter.grouped(10)
.map(_.toArray))
.map(splitArray => {
val kryo = kryoSerializer.newKryo()
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
val byteWritable = new BytesWritable(bao.toByteArray)
(NullWritable.get(), byteWritable)
}).saveAsSequenceFile(path)
}
def objectFile[A : ClassTag](sc : SparkContext, path : String, minPartitions: Int = 1)(implicit tt : TypeTag[A]) = {
val kryoSerializer = new KryoSerializer(sc.conf)
sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
.flatMap(x => {
val kryo = kryoSerializer.newKryo()
val input = new Input()
input.setBuffer(x._2.getBytes)
val data = kryo.readClassAndObject(input)
val dataObject = data.asInstanceOf[Array[A]]
dataObject
})
}
}
object KryoSerializerDeserializer {
def apply() = new KryoSerializerDeserializer
@igreenfield
Copy link

igreenfield commented Jul 31, 2018

@sbcd90 Did you test your code? as in line 48 you are not using the newDF from line 48

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment