Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
having monad instances for RDD like things
object RDDasMonadPlus {
import org.apache.spark.{ SparkContext }
import org.apache.spark.rdd.RDD
import scalaz._
import Scalaz._
import scala.reflect.ClassTag
// RDDMPlus is the type for which we will define the Monad instance. it can be
// constructed from an RDD using the RDDClassTag constructor. this
// implementation is based on insights from
// <>
sealed trait RDDMPlus[A]
// wherever ClassTag[A] is available use this constructor. specifically,
// wherever you are generating RDD, you should be generating RDDClassTag
// instead since the fact you are able to generate an RDD implies the
// pertinent ClassTag is already in scope.
case class RDDClassTag[A: ClassTag](rdd: RDD[A]) extends RDDMPlus[A] {
def ct: ClassTag[A] = implicitly[ClassTag[A]]
// use RDDAny only when no ClassTag[A] is available.
case class RDDAny[A](list: List[A]) extends RDDMPlus[A]
// the below is not needed for MonadPlus instance, but is useful
implicit class RDDMPlusOps[A](val self: RDDMPlus[A]) extends AnyVal {
def toRDD(sc: SparkContext)(implicit ev: ClassTag[A]): RDD[A] = self match {
case RDDAny(xs) => sc.parallelize(xs)
case RDDClassTag(rdd) => rdd
implicit def MonadPlusRDDInstance(implicit sc: SparkContext) = new MonadPlus[RDDMPlus] {
// Members declared in scalaz.PlusEmpty
def empty[A]: RDDMPlus[A] = RDDAny(Nil)
// Members declared in scalaz.Plus
def plus[A](x: RDDMPlus[A], y: => RDDMPlus[A]): RDDMPlus[A] = x match {
// the goal here is to end up with RDDClassTag wherever ClassTag[A] is
// accessible, thereby propagating that information
case RDDAny(xs) => y match {
case RDDAny(ys) => RDDAny(xs ++ ys)
case yct @ RDDClassTag(ys) => {
implicit val classTag = yct.ct
RDDClassTag(sc.parallelize(xs) ++ ys)
case xct @ RDDClassTag(xs) => {
implicit val classTag = xct.ct
y match {
case RDDAny(ys) => RDDClassTag(xs ++ sc.parallelize(ys))
case RDDClassTag(ys) => RDDClassTag(xs ++ ys)
// Members declared in scalaz.Applicative
def point[A](a: => A): RDDMPlus[A] = RDDAny(List(a))
// Members declared in scalaz.Bind
def bind[A, B](fa: RDDMPlus[A])(f: A => RDDMPlus[B]): RDDMPlus[B] = fa match {
case RDDClassTag(r) =>, _))
case RDDAny(l) =>, _))
class RDDMonadPlusUsage(sc: org.apache.spark.SparkContext) {
import org.apache.spark.{ SparkContext }
import org.apache.spark.rdd.RDD
import scalaz._
import Scalaz._
// some generic function which requires Monad instance
def foo[R[_]: Monad, A, B](ma: R[A])(f: A => R[B]): R[B] = {
implicitly[Monad[R]].bind[A, B](ma)(f)
import RDDasMonadPlus._ // bring the MonadPlusRDDInstance implicit in scope
implicit val impSc = sc // allow the implicit to be triggered
// lets try and use foo
def bar[A, B](ra: RDDMPlus[A])(f: A => RDDMPlus[B]): RDDMPlus[B] = {
// at this point, the implicit can kick in and the call to foo succeeds
// below
// let's say that we had something in the form of RDDs can we still use
// implicit? the answer is yes, so long we either have concrete types (instead
// of type variables) or the ClassTag is available. of course, if neither of
// those conditions are met you can always convert to List and use RDDAny, but
// that's not something you should be doing
def baz(ra: RDD[Int])(f: Int => RDD[Double]): RDD[Double] = {
// first we need to transform inputs to RDDMPlus
val ma: RDDMPlus[Int] = RDDClassTag(ra) // this works because we have a
// known specific type RDD[Int] for
// ra
val g: Int => RDDMPlus[Double] = f andThen RDDClassTag.apply
bar(ma)(g). // now we can invoke bar
toRDD(sc) // and then finally convert back to RDD
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.