Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save ankurdave/f5d4df4b521ac83b9c7d to your computer and use it in GitHub Desktop.
Save ankurdave/f5d4df4b521ac83b9c7d to your computer and use it in GitHub Desktop.
import scala.language.higherKinds
// Common interface of RDD and DStream. Note the Coll type parameter, which will either be RDD or DStream.
trait DistributedCollection[A, Coll[_]] {
def map[B](f: A => B): Coll[B]
}
class RDD[A](val x: A) extends DistributedCollection[A, RDD] {
def map[B](f: A => B): RDD[B] = new RDD(f(x))
}
class DStream[A](val y: A) extends DistributedCollection[A, DStream] {
def map[B](f: A => B): DStream[B] = new DStream(f(y))
}
// An example method that operates on either RDD or DStream. The Coll type parameter can either be
// RDD or DStream, and this constraint is expressed using the "<:" type bound.
def transform[Coll[X] <: DistributedCollection[X, Coll]](coll: Coll[Int]): Coll[String] =
coll.map(_ + 1).map(_.toString)
// Example of the method's usage
val rdd = new RDD[Int](1)
val dstream = new DStream[Int](2)
transform(rdd).x
transform(dstream).y
// Toy definitions of RDD and DStream
class RDD[A](val x: A) {
def map[B](f: A => B): RDD[B] = new RDD(f(x))
}
class DStream[A](val y: A) {
def map[B](f: A => B): DStream[B] = new DStream(f(y))
}
import scala.language.higherKinds
import scala.language.implicitConversions
// The common interface for RDD and DStream
trait DistributedCollection[A, Coll[_]] {
def map[B](f: A => B): Coll[B]
}
// The typeclass that allows converting RDD and DStream to DistributedCollection
trait ToDistributedCollection[Coll[_]] {
def apply[A](c: Coll[A]): DistributedCollection[A, Coll]
}
// Instances of the typeclass for RDD and DStream
implicit object RddToDistributedCollection extends ToDistributedCollection[RDD] {
def apply[A](rdd: RDD[A]) = new DistributedCollection[A, RDD] {
def map[B](f: A => B): RDD[B] = rdd.map(f)
}
}
implicit object DstreamToDistributedCollection extends ToDistributedCollection[DStream] {
def apply[A](dstream: DStream[A]) = new DistributedCollection[A, DStream] {
def map[B](f: A => B): DStream[B] = dstream.map(f)
}
}
// An example method that operates on either RDD or DStream. The implicit parameter allows
// converting a Coll to a DistributedCollection, allowing us to operate on it. When chaining
// transformations, we have to use it twice.
def transform[Coll[_]](coll: Coll[Int])(implicit dc: ToDistributedCollection[Coll]): Coll[String] =
dc(dc(coll).map(_ + 1)).map(_.toString)
// Example of the method's usage
val rdd = new RDD[Int](1)
val dstream = new DStream[Int](2)
transform(rdd).x
transform(dstream).y
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment