Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save suhailshergill/5d95e0cd368c71772b23 to your computer and use it in GitHub Desktop.
Save suhailshergill/5d95e0cd368c71772b23 to your computer and use it in GitHub Desktop.
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
/**
* I am trying to narrow down on an Exception thrown by Spark when using "Factories".
* The factories have parameters that are used in the classes' functions.
*
* To run this code: copy-paste this whole content in a Spark-Shell. Execute Test.theMain(sc)
*
*/
object Test {
abstract class A {
def aFunction: RDD[Int] => RDD[Int]
}
/**
* Creates a class that uses the parameter of the factory directly
*/
def factory1(i: Int) = new A {
def aFunction = {
rdd: RDD[Int] => rdd.map(_ + i)
}
}
/**
* The class returned that creates an instance's local reference (val) of the parameter
*/
def factory2(i: Int) = new A {
val j = i
def aFunction = {
rdd: RDD[Int] => rdd.map(_ + j)
}
}
/**
* Creates a class whose function has a local reference (val) of the parameter
*/
def factory3(i: Int) = new A {
def aFunction = {
rdd: RDD[Int] => { val j = i; rdd.map(_ + j) }
}
}
/**
* Local reference (val) of the parameter in closure
*/
def factory4(i: Int) = new A {
def aFunction = {
rdd: RDD[Int] => rdd.map { anInt => val j = i; anInt + j }
}
}
def theMain(sc: SparkContext) = {
val anRDD = sc.parallelize(1 to 9)
val runningOptions = List(("Factory1", factory1 _), ("Factory2", factory2 _), ("Factory3", factory3 _), ("Factory4", factory4 _))
runningOptions foreach { case (theName, theFactory) =>
println(s"RUNNING TEST WITH [${theName}]")
try {
val rddPlus2 = theFactory(2)
val anRDDPlus2 = rddPlus2.aFunction(anRDD)
(anRDD zip anRDDPlus2).foreach{ case (i, iPlus2) => println(s"[${theName}] i = ${i}, iPlus2 = ${iPlus2}}") }
}
catch {
case e: Exception => println(s"${theName} ERROR ===> ${e.getMessage}")
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment