Skip to content

Instantly share code, notes, and snippets.

@ldacosta
Last active August 29, 2015 14:07
Show Gist options
  • Save ldacosta/ea9d0e2ead06aafe7ca1 to your computer and use it in GitHub Desktop.
Save ldacosta/ea9d0e2ead06aafe7ca1 to your computer and use it in GitHub Desktop.
I am trying to narrow down an Exception thrown by Spark when using "factories" to create classes. See example below ====> only factory2 works!
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
/**
* I am trying to narrow down on an Exception thrown by Spark when using "Factories".
* The factories have parameters that are used in the classes' functions.
*
* To run this code: copy-paste this whole content in a Spark-Shell. Execute Test.theMain(sc)
*
*/
object Test {
abstract class A {
def aFunction: RDD[Int] => RDD[Int]
}
/**
* Creates a class that uses the parameter of the factory directly
*/
def factory1(i: Int) = new A {
def aFunction = {
rdd: RDD[Int] => rdd.map(_ + i)
}
}
/**
* The class returned that creates an instance's local reference (val) of the parameter
*/
def factory2(i: Int) = new A {
val j = i
def aFunction = {
rdd: RDD[Int] => rdd.map(_ + j)
}
}
/**
* Creates a class whose function has a local reference (val) of the parameter
*/
def factory3(i: Int) = new A {
def aFunction = {
rdd: RDD[Int] => { val j = i; rdd.map(_ + j) }
}
}
/**
* Local reference (val) of the parameter in closure
*/
def factory4(i: Int) = new A {
def aFunction = {
rdd: RDD[Int] => rdd.map { anInt => val j = i; anInt + j }
}
}
def theMain(sc: SparkContext) = {
val anRDD = sc.parallelize(1 to 9)
val runningOptions = List(("Factory1", factory1 _), ("Factory2", factory2 _), ("Factory3", factory3 _), ("Factory4", factory4 _))
runningOptions foreach { case (theName, theFactory) =>
println(s"RUNNING TEST WITH [${theName}]")
try {
val rddPlus2 = theFactory(2)
val anRDDPlus2 = rddPlus2.aFunction(anRDD)
(anRDD zip anRDDPlus2).foreach{ case (i, iPlus2) => println(s"[${theName}] i = ${i}, iPlus2 = ${iPlus2}}") }
}
catch {
case e: Exception => println(s"${theName} ERROR ===> ${e.getMessage}")
}
}
}
}
@suhailshergill
Copy link

@ldacosta could you try the following as well:

def factory5(i: Int) = {
  val j = i
  new A {
    def aFunction = {
      rdd: RDD[Int] => rdd.map(_ + j)
    }
  }
}

def factory6(i: Int) = {
  new A {
    def aFunction = {
      val j = i
      rdd: RDD[Int] => rdd.map(_ + j)
    }
  }
}

@suhailshergill
Copy link

as per discussion today, another thing to try is to access variables in aFunction which is a field in an object. eg. aFunction is an rdd transformer which increments values by both i and MyObject.j. and then with MyObject.j you should try different options of imports (wildcard, specific, with/without renamings) and place them in different places and see which factories work

@MasseGuillaume
Copy link

RUNNING TEST WITH [Factory1]
Factory1 ERROR ===> Task not serializable
RUNNING TEST WITH [Factory2]
Factory2 ERROR ===> Task not serializable
RUNNING TEST WITH [Factory3]
RUNNING TEST WITH [Factory4]
Factory4 ERROR ===> Task not serializable
RUNNING TEST WITH [Factory5]
Factory5 ERROR ===> Task not serializable
RUNNING TEST WITH [Factory6]
defined class A
factory1: (i: Int)A
factory2: (i: Int)A{val j: Int}
factory3: (i: Int)A
factory4: (i: Int)A
factory5: (i: Int)A
factory6: (i: Int)A
anRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[103] at parallelize at <console>:73

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment