Created
March 4, 2019 11:08
-
-
Save fsarradin/da35fb5c36d444a85305e4ed4e3d42b4 to your computer and use it in GitHub Desktop.
Spark and Case class deser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.chill.Externalizer | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.expressions.UserDefinedFunction | |
import org.apache.spark.sql.functions._ | |
import scala.reflect.runtime.universe._ | |
object TestMain { | |
def main(args: Array[String]): Unit = { | |
val spark: SparkSession = | |
SparkSession | |
.builder() | |
.master("local[1]") | |
.getOrCreate() | |
import spark.implicits._ | |
import Implicits._ | |
val df = | |
Seq( | |
("abc", Seq("Hello world"), Seq(0L, 2L)), | |
("def", Seq("Wild Horses", "Paint It Black"), Seq(1L, 2L)) | |
).toDF("id", "text", "timestamps") | |
df.show(false) | |
df.createOrReplaceTempView("data") | |
val result = | |
df.select( | |
$"id", | |
$"timestamps".map((t: Long) => t * 2).as("map"), | |
$"text".flatMap((t: String) => t.split(" ").toSeq).as("flatMap"), | |
$"timestamps".foldLeft[Long, Long](0)(_ + _).as("foldLeft") | |
) | |
result.show(false) | |
spark.sql(""" | |
SELECT | |
id, | |
transform(timestamps, t -> t * 2) as transform, | |
flatten(transform(text, t -> split(t, " "))) as flatten_transform, | |
aggregate(timestamps, bigint(0), (s, v) -> s + v) as aggregate | |
FROM data | |
""".stripMargin).show(false) | |
} | |
} | |
object Implicits { | |
implicit class RicherColumn(column: Column) { | |
/** Transform each element of an array by applying the function g. | |
*/ | |
def map[A: TypeTag, B: TypeTag](g: A => B): Column = { | |
val f = udf[Seq[B], Seq[A]](s => s.map(g)) | |
f(column) | |
} | |
/** Transform each element of an array by applying the function g | |
* and using the elements of the resulting array. | |
*/ | |
def flatMap[A: TypeTag, B: TypeTag](g: A => Seq[B]): Column = { | |
val f = udf[Seq[B], Seq[A]](s => s.flatMap(g)) | |
f(column) | |
} | |
/** Aggregate an array to a single value. | |
*/ | |
def foldLeft[A: TypeTag, B: TypeTag](init: B)(g: (B, A) => B): Column = { | |
val f = udf[B, Seq[A]](s => s.foldLeft(init)(g)) | |
f(column) | |
} | |
} | |
} | |
trait CleanFromRow[A] extends Serializable { | |
def clean(a: Any): A | |
} | |
object CleanFromRow { | |
import scala.reflect.ClassTag | |
import magnolia._ | |
import language.experimental.macros | |
type Typeclass[T] = CleanFromRow[T] | |
private def instance[A]: Typeclass[A] = | |
new Typeclass[A] { | |
override def clean(a: Any): A = a.asInstanceOf[A] | |
} | |
implicit val double: Typeclass[Double] = instance | |
implicit val boolean: Typeclass[Boolean] = instance | |
implicit val strCFR: Typeclass[String] = instance | |
implicit val intCFR: Typeclass[Int] = instance | |
implicit val longCFR: Typeclass[Long] = instance | |
implicit def opt[T: Typeclass: Manifest]: Typeclass[Option[T]] = | |
new Typeclass[Option[T]] { | |
private val rc = implicitly[Manifest[T]].runtimeClass | |
override def clean(a: Any): Option[T] = | |
a match { | |
case ox: Option[_] | |
if ox.forall(x => rc.isAssignableFrom(x.getClass)) => | |
ox.asInstanceOf[Option[T]] | |
case null => None | |
case x => Option(implicitly[Typeclass[T]].clean(x)) | |
} | |
} | |
implicit def seq[T: Typeclass: Manifest]: Typeclass[Seq[T]] = | |
new Typeclass[Seq[T]] { | |
private val rc = implicitly[Manifest[T]].runtimeClass | |
override def clean(a: Any): Seq[T] = | |
a match { | |
case Nil => Nil | |
case xs: Seq[_] if xs.forall(x => rc.isAssignableFrom(x.getClass)) => | |
xs.asInstanceOf[Seq[T]] | |
case x: Seq[_] => x.map(implicitly[Typeclass[T]].clean) | |
} | |
} | |
def combine[T: ClassTag](ctx: CaseClass[CleanFromRow, T]): Typeclass[T] = | |
new Typeclass[T] { | |
override def clean(a: Any): T = | |
a match { | |
case a: T => a | |
case r: Row => | |
val values: Seq[Any] = | |
r.toSeq | |
.zip(ctx.parameters) | |
.map { | |
case (rowValue, param) => param.typeclass.clean(rowValue) | |
} | |
ctx.rawConstruct(values) | |
} | |
} | |
implicit def gen[T]: CleanFromRow[T] = macro Magnolia.gen[T] | |
} | |
case class Tata(a: String, b: Int) | |
case class Toto(tatas: Seq[Tata]) | |
object UnnestedSpark { | |
def tataToX(seq: Seq[Tata]): String = | |
seq.map(_.a).headOption.getOrElse("") | |
def cleanF[A: CleanFromRow, B](f: Seq[A] => B): Seq[A] => B = { | |
val g: Externalizer[A => A] = Externalizer( | |
implicitly[CleanFromRow[A]].clean _ | |
) | |
val mf: Externalizer[Seq[A] => B] = Externalizer(f) | |
a => | |
if (a == null) { | |
null.asInstanceOf[B] | |
} else { | |
val f0: Seq[A] => B = mf.get | |
val g0: A => A = g.get | |
f0(a.map(g0)) | |
} | |
} | |
def main(args: Array[String]): Unit = { | |
val ss = SparkSession | |
.builder() | |
.master("local[1]") | |
.getOrCreate() | |
import ss.implicits._ | |
ss.udf.register("tataToX", cleanF(tataToX)) | |
val df = ss.createDataset(Seq(Toto(Seq(Tata("a", 1))))).toDF() | |
df.show() | |
df.select(expr("tataToX(tatas)")).show(false) | |
val udfTataToX = udf[String, Seq[Tata]](cleanF(tataToX)) | |
val mapping: UserDefinedFunction = map[Tata, String](t => "Hello: " + t.a) | |
df.select(mapping($"tatas")).show() | |
} | |
def map[A: CleanFromRow: TypeTag, B: TypeTag](g: A => B) = { | |
def cleanedMap: Seq[A] => Seq[B] = cleanF[A, Seq[B]](as => as.map(g)) | |
val f = udf[Seq[B], Seq[A]](cleanedMap) | |
f | |
} | |
} | |
case class Person(name: String, age: Int) | |
object Example2 { | |
import Implicits2._ | |
def main(args: Array[String]): Unit = { | |
val ss = SparkSession | |
.builder() | |
.master("local[1]") | |
.getOrCreate() | |
import ss.implicits._ | |
val df = ss | |
.createDataset( | |
Seq( | |
("1", Seq(Person("John", 32), Person("Mary", 31))), | |
( | |
"2", | |
Seq(Person("Fred", 42), Person("Elsa", 40), Person("Daryll", 10)) | |
) | |
) | |
) | |
.toDF("id", "family") | |
df.show(false) | |
df.printSchema() | |
df.select($"id", $"family".map((p: Person) => p.name).as("family")) | |
.show(false) | |
} | |
} | |
object Implicits2 { | |
private def cleanF[A: CleanFromRow, B](f: Seq[A] => B): Seq[A] => B = { | |
val g: Externalizer[A => A] = Externalizer( | |
implicitly[CleanFromRow[A]].clean _ | |
) | |
val mf: Externalizer[Seq[A] => B] = Externalizer(f) | |
a => | |
if (a == null) { | |
null.asInstanceOf[B] | |
} else { | |
val f0: Seq[A] => B = mf.get | |
val g0: A => A = g.get | |
f0(a.map(g0)) | |
} | |
} | |
implicit class RicherColumn2(column: Column) { | |
def map[A: CleanFromRow: TypeTag, B: TypeTag](g: A => B): Column = { | |
val f = udf[Seq[B], Seq[A]](cleanF(s => s.map(g))) | |
f(column) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment