Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
having monad instances for RDD like things
object RDDasMonadPlus {
import org.apache.spark.{ SparkContext }
import org.apache.spark.rdd.RDD
import scalaz._
import Scalaz._
import scala.reflect.ClassTag
// RDDMPlus is the type for which we will define the Monad instance. it can be
// constructed from an RDD using the RDDClassTag constructor. this
// implementation is based on insights from
// <http://okmij.org/ftp/Haskell/set-monad.html#PE>
sealed trait RDDMPlus[A]
// wherever ClassTag[A] is available use this constructor. specifically,
// wherever you are generating RDD, you should be generating RDDClassTag
// instead since the fact you are able to generate an RDD implies the
// pertinent ClassTag is already in scope.
case class RDDClassTag[A: ClassTag](rdd: RDD[A]) extends RDDMPlus[A] {
def ct: ClassTag[A] = implicitly[ClassTag[A]]
}
// use RDDAny only when no ClassTag[A] is available.
case class RDDAny[A](list: List[A]) extends RDDMPlus[A]
// the below is not needed for MonadPlus instance, but is useful
implicit class RDDMPlusOps[A](val self: RDDMPlus[A]) extends AnyVal {
def toRDD(sc: SparkContext)(implicit ev: ClassTag[A]): RDD[A] = self match {
case RDDAny(xs) => sc.parallelize(xs)
case RDDClassTag(rdd) => rdd
}
}
implicit def MonadPlusRDDInstance(implicit sc: SparkContext) = new MonadPlus[RDDMPlus] {
// Members declared in scalaz.PlusEmpty
def empty[A]: RDDMPlus[A] = RDDAny(Nil)
// Members declared in scalaz.Plus
def plus[A](x: RDDMPlus[A], y: => RDDMPlus[A]): RDDMPlus[A] = x match {
// the goal here is to end up with RDDClassTag wherever ClassTag[A] is
// accessible, thereby propagating that information
case RDDAny(xs) => y match {
case RDDAny(ys) => RDDAny(xs ++ ys)
case yct @ RDDClassTag(ys) => {
implicit val classTag = yct.ct
RDDClassTag(sc.parallelize(xs) ++ ys)
}
}
case xct @ RDDClassTag(xs) => {
implicit val classTag = xct.ct
y match {
case RDDAny(ys) => RDDClassTag(xs ++ sc.parallelize(ys))
case RDDClassTag(ys) => RDDClassTag(xs ++ ys)
}
}
}
// Members declared in scalaz.Applicative
def point[A](a: => A): RDDMPlus[A] = RDDAny(List(a))
// Members declared in scalaz.Bind
def bind[A, B](fa: RDDMPlus[A])(f: A => RDDMPlus[B]): RDDMPlus[B] = fa match {
case RDDClassTag(r) => r.map(f).fold(empty)(plus(_, _))
case RDDAny(l) => l.map(f).fold(empty)(plus(_, _))
}
}
}
class RDDMonadPlusUsage(sc: org.apache.spark.SparkContext) {
import org.apache.spark.{ SparkContext }
import org.apache.spark.rdd.RDD
import scalaz._
import Scalaz._
// some generic function which requires Monad instance
def foo[R[_]: Monad, A, B](ma: R[A])(f: A => R[B]): R[B] = {
implicitly[Monad[R]].bind[A, B](ma)(f)
}
import RDDasMonadPlus._ // bring the MonadPlusRDDInstance implicit in scope
implicit val impSc = sc // allow the implicit to be triggered
// lets try and use foo
def bar[A, B](ra: RDDMPlus[A])(f: A => RDDMPlus[B]): RDDMPlus[B] = {
// at this point, the implicit can kick in and the call to foo succeeds
// below
foo(ra)(f)
}
// let's say that we had something in the form of RDDs can we still use
// implicit? the answer is yes, so long we either have concrete types (instead
// of type variables) or the ClassTag is available. of course, if neither of
// those conditions are met you can always convert to List and use RDDAny, but
// that's not something you should be doing
def baz(ra: RDD[Int])(f: Int => RDD[Double]): RDD[Double] = {
// first we need to transform inputs to RDDMPlus
val ma: RDDMPlus[Int] = RDDClassTag(ra) // this works because we have a
// known specific type RDD[Int] for
// ra
val g: Int => RDDMPlus[Double] = f andThen RDDClassTag.apply
bar(ma)(g). // now we can invoke bar
toRDD(sc) // and then finally convert back to RDD
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.