Skip to content

Instantly share code, notes, and snippets.

@ahoy-jon
Created January 11, 2018 09:07
Show Gist options
  • Save ahoy-jon/54a1ee9cdaa61526fc22b1cda0d72d47 to your computer and use it in GitHub Desktop.
Save ahoy-jon/54a1ee9cdaa61526fc22b1cda0d72d47 to your computer and use it in GitHub Desktop.
package casttostruct
import org.apache.spark.sql.{Encoder, SaveMode, SparkSession}
import org.scalatest.FunSuite
case class A1(name: String)
case class A2(name: String, age: Option[Int])
case class A3(name: String, age: Int)
case class B1(name:String, bbs:Seq[BB1])
case class BB1(a:String)
case class B2(name:String,bbs:Seq[BB2])
case class BB2(i:Option[Int],a:String)
class CastToStruct extends FunSuite {
val ss = SparkSession.builder().master("local").getOrCreate()
import ss.implicits._
test("A1 to A2 should work") {
ss.createDataFrame(Seq(A1("hello"))).write.mode(SaveMode.Overwrite).parquet("target/a1")
val a2Schema = implicitly[Encoder[A2]].schema
assert(ss.read.schema(a2Schema).parquet("target/a1").as[A2].collect().head == A2("hello", None))
}
test("A1 to A3 should fail") {
ss.createDataFrame(Seq(A1("hello"))).write.mode(SaveMode.Overwrite).parquet("target/a1")
val a3schema = implicitly[Encoder[A3]].schema
assertThrows[RuntimeException](ss.read.schema(a3schema).parquet("target/a1").as[A3].collect().head)
}
test("B1 to B2") {
ss.createDataFrame(Seq(B1("hello", Seq(BB1("ahoy"))))).write.mode(SaveMode.Overwrite).parquet("target/b1")
val b2Schema = implicitly[Encoder[B2]].schema
assert(ss.read.schema(b2Schema).parquet("target/b1").as[B2].collect().head == B2("hello",Seq(BB2(a = "ahoy", i = None))))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment