Skip to content

Instantly share code, notes, and snippets.

Last active May 10, 2017 08:52
Show Gist options
  • Save elyast/2e9d614e9f73331cd6c2 to your computer and use it in GitHub Desktop.
Save elyast/2e9d614e9f73331cd6c2 to your computer and use it in GitHub Desktop.
Shapeless usage in spark pipelines
// This allow to transform single entity (from a given RDD or DStream) in different ways
// using HList of mappers and bind them into given output type
// potentially useful when doing feature extraction out of single entity / basic etl
// Usage:
import twitter4j.Status
import shapeless._
val user: Status => String = ... // a function that extract author of tweet
val tags: Status => List[String] = ... // a function that gets tags from tweet
case class UserTags(user: String, tags: List[String])
val streamOfTweets: DStream[Status] = ... { value =>
from(user :: tags :: HNil).to[UserTags]
// obviously if map functions doesnt match with case class we are getting compilation error,
// and now the implementation part
import shapeless._
import scala.collection.immutable.{:: => Cons}
case class from[A, L <: HList, M <: HList](mappers: L)(implicit tr: Applicator.Aux[A, L, M]) {
def to[B](value: A)(implicit gen: Generic.Aux[B, M]): B = {
gen.from(tr(value, mappers))
trait Applicator[A, L <: HList] extends DepFn2[A, L] with Serializable {
type Out <: HList
object Applicator {
def apply[A, L <: HList](implicit tr: Applicator[A, L]): Aux[A, L, tr.Out] = tr
type Aux[A, L <: HList, Out0 <: HList] = Applicator[A, L] {type Out = Out0}
implicit def hnilApplicator[A]: Aux[A, HNil, HNil] =
new Applicator[A, HNil] {
type Out = HNil
def apply(elem: A, l: HNil): Out = l
implicit def hlistApplicator[A, H, T <: HList]
(implicit st: Applicator[A, T]): Aux[A, (A => H) :: T, H :: st.Out] =
new Applicator[A, (A => H) :: T] {
type Out = H :: st.Out
def apply(elem: A, l: (A => H) :: T) = l.head(elem) :: st(elem, l.tail)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment