Skip to content

Instantly share code, notes, and snippets.

@fsarradin
Created March 4, 2019 11:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fsarradin/da35fb5c36d444a85305e4ed4e3d42b4 to your computer and use it in GitHub Desktop.
Save fsarradin/da35fb5c36d444a85305e4ed4e3d42b4 to your computer and use it in GitHub Desktop.
Spark and Case class deser
import com.twitter.chill.Externalizer
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import scala.reflect.runtime.universe._
object TestMain {
def main(args: Array[String]): Unit = {
val spark: SparkSession =
SparkSession
.builder()
.master("local[1]")
.getOrCreate()
import spark.implicits._
import Implicits._
val df =
Seq(
("abc", Seq("Hello world"), Seq(0L, 2L)),
("def", Seq("Wild Horses", "Paint It Black"), Seq(1L, 2L))
).toDF("id", "text", "timestamps")
df.show(false)
df.createOrReplaceTempView("data")
val result =
df.select(
$"id",
$"timestamps".map((t: Long) => t * 2).as("map"),
$"text".flatMap((t: String) => t.split(" ").toSeq).as("flatMap"),
$"timestamps".foldLeft[Long, Long](0)(_ + _).as("foldLeft")
)
result.show(false)
spark.sql("""
SELECT
id,
transform(timestamps, t -> t * 2) as transform,
flatten(transform(text, t -> split(t, " "))) as flatten_transform,
aggregate(timestamps, bigint(0), (s, v) -> s + v) as aggregate
FROM data
""".stripMargin).show(false)
}
}
object Implicits {
implicit class RicherColumn(column: Column) {
/** Transform each element of an array by applying the function g.
*/
def map[A: TypeTag, B: TypeTag](g: A => B): Column = {
val f = udf[Seq[B], Seq[A]](s => s.map(g))
f(column)
}
/** Transform each element of an array by applying the function g
* and using the elements of the resulting array.
*/
def flatMap[A: TypeTag, B: TypeTag](g: A => Seq[B]): Column = {
val f = udf[Seq[B], Seq[A]](s => s.flatMap(g))
f(column)
}
/** Aggregate an array to a single value.
*/
def foldLeft[A: TypeTag, B: TypeTag](init: B)(g: (B, A) => B): Column = {
val f = udf[B, Seq[A]](s => s.foldLeft(init)(g))
f(column)
}
}
}
trait CleanFromRow[A] extends Serializable {
def clean(a: Any): A
}
object CleanFromRow {
import scala.reflect.ClassTag
import magnolia._
import language.experimental.macros
type Typeclass[T] = CleanFromRow[T]
private def instance[A]: Typeclass[A] =
new Typeclass[A] {
override def clean(a: Any): A = a.asInstanceOf[A]
}
implicit val double: Typeclass[Double] = instance
implicit val boolean: Typeclass[Boolean] = instance
implicit val strCFR: Typeclass[String] = instance
implicit val intCFR: Typeclass[Int] = instance
implicit val longCFR: Typeclass[Long] = instance
implicit def opt[T: Typeclass: Manifest]: Typeclass[Option[T]] =
new Typeclass[Option[T]] {
private val rc = implicitly[Manifest[T]].runtimeClass
override def clean(a: Any): Option[T] =
a match {
case ox: Option[_]
if ox.forall(x => rc.isAssignableFrom(x.getClass)) =>
ox.asInstanceOf[Option[T]]
case null => None
case x => Option(implicitly[Typeclass[T]].clean(x))
}
}
implicit def seq[T: Typeclass: Manifest]: Typeclass[Seq[T]] =
new Typeclass[Seq[T]] {
private val rc = implicitly[Manifest[T]].runtimeClass
override def clean(a: Any): Seq[T] =
a match {
case Nil => Nil
case xs: Seq[_] if xs.forall(x => rc.isAssignableFrom(x.getClass)) =>
xs.asInstanceOf[Seq[T]]
case x: Seq[_] => x.map(implicitly[Typeclass[T]].clean)
}
}
def combine[T: ClassTag](ctx: CaseClass[CleanFromRow, T]): Typeclass[T] =
new Typeclass[T] {
override def clean(a: Any): T =
a match {
case a: T => a
case r: Row =>
val values: Seq[Any] =
r.toSeq
.zip(ctx.parameters)
.map {
case (rowValue, param) => param.typeclass.clean(rowValue)
}
ctx.rawConstruct(values)
}
}
implicit def gen[T]: CleanFromRow[T] = macro Magnolia.gen[T]
}
case class Tata(a: String, b: Int)
case class Toto(tatas: Seq[Tata])
object UnnestedSpark {
def tataToX(seq: Seq[Tata]): String =
seq.map(_.a).headOption.getOrElse("")
def cleanF[A: CleanFromRow, B](f: Seq[A] => B): Seq[A] => B = {
val g: Externalizer[A => A] = Externalizer(
implicitly[CleanFromRow[A]].clean _
)
val mf: Externalizer[Seq[A] => B] = Externalizer(f)
a =>
if (a == null) {
null.asInstanceOf[B]
} else {
val f0: Seq[A] => B = mf.get
val g0: A => A = g.get
f0(a.map(g0))
}
}
def main(args: Array[String]): Unit = {
val ss = SparkSession
.builder()
.master("local[1]")
.getOrCreate()
import ss.implicits._
ss.udf.register("tataToX", cleanF(tataToX))
val df = ss.createDataset(Seq(Toto(Seq(Tata("a", 1))))).toDF()
df.show()
df.select(expr("tataToX(tatas)")).show(false)
val udfTataToX = udf[String, Seq[Tata]](cleanF(tataToX))
val mapping: UserDefinedFunction = map[Tata, String](t => "Hello: " + t.a)
df.select(mapping($"tatas")).show()
}
def map[A: CleanFromRow: TypeTag, B: TypeTag](g: A => B) = {
def cleanedMap: Seq[A] => Seq[B] = cleanF[A, Seq[B]](as => as.map(g))
val f = udf[Seq[B], Seq[A]](cleanedMap)
f
}
}
case class Person(name: String, age: Int)
object Example2 {
import Implicits2._
def main(args: Array[String]): Unit = {
val ss = SparkSession
.builder()
.master("local[1]")
.getOrCreate()
import ss.implicits._
val df = ss
.createDataset(
Seq(
("1", Seq(Person("John", 32), Person("Mary", 31))),
(
"2",
Seq(Person("Fred", 42), Person("Elsa", 40), Person("Daryll", 10))
)
)
)
.toDF("id", "family")
df.show(false)
df.printSchema()
df.select($"id", $"family".map((p: Person) => p.name).as("family"))
.show(false)
}
}
object Implicits2 {
private def cleanF[A: CleanFromRow, B](f: Seq[A] => B): Seq[A] => B = {
val g: Externalizer[A => A] = Externalizer(
implicitly[CleanFromRow[A]].clean _
)
val mf: Externalizer[Seq[A] => B] = Externalizer(f)
a =>
if (a == null) {
null.asInstanceOf[B]
} else {
val f0: Seq[A] => B = mf.get
val g0: A => A = g.get
f0(a.map(g0))
}
}
implicit class RicherColumn2(column: Column) {
def map[A: CleanFromRow: TypeTag, B: TypeTag](g: A => B): Column = {
val f = udf[Seq[B], Seq[A]](cleanF(s => s.map(g)))
f(column)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment