Skip to content

Instantly share code, notes, and snippets.

@juanrh
Created August 24, 2015 12:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save juanrh/dffd060e3a371676b83c to your computer and use it in GitHub Desktop.
Save juanrh/dffd060e3a371676b83c to your computer and use it in GitHub Desktop.
package es.ucm.fdi.sscheck.testing
import org.junit.runner.RunWith
import org.specs2.runner.JUnitRunner
import org.specs2.execute.{AsResult,Result}
import org.specs2.matcher.Matcher
import org.specs2.matcher.MatchersImplicits._
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{StreamingContext,Duration}
import es.ucm.fdi.sscheck.spark.streaming.{SharedStreamingContextBeforeAfterEach,StreamingContextUtils}
import scala.collection.mutable.Queue
object RDDMatchers {
def foreachRecord[T](predicate : T => Boolean) : Matcher[RDD[T]] = { (rdd : RDD[T]) =>
val failingRecords = rdd.filter(! predicate(_))
(
failingRecords.isEmpty,
"each record fulfils the predicate",
s"predicate failed for records ${failingRecords.take(4).mkString(", ")} ..."
)
}
def foreachRecord[T,C](predicateContext : C)(toPredicate : C => (T => Boolean)) : Matcher[RDD[T]] = {
val predicate = toPredicate(predicateContext)
foreachRecord(predicate)
}
}
@RunWith(classOf[JUnitRunner])
class SerialProblemTest
extends org.specs2.Specification
//with org.specs2.matcher.MustThrownExpectations
with org.specs2.specification.BeforeAfterAll
with Serializable {
import RDDMatchers._
@transient var sc : SparkContext = _
override def beforeAll : Unit = {
val sparkMaster = "local[4]"
val conf = new SparkConf().setMaster(sparkMaster).setAppName("SerialProblem")
println("Starting Spark context")
sc = new SparkContext(conf)
}
override def afterAll : Unit = {
println("Stopping Spark context")
sc.stop()
}
def is = sequential ^ s2"""
SerialProblemTest
- simpleBatchTest $simpleBatchTest
- simpleStreamingTest $simpleStreamingTest
"""
def simpleBatchTest = {
val rdd = sc.parallelize(1 to 100, 3)
rdd should foreachRecord(_ > 0)
}
def simpleStreamingTest : Result = {
val ssc = new StreamingContext(sc, Duration(300))
val record = "hola"
val batches = Seq.fill(5)(Seq.fill(10)(record))
val queue = new Queue[RDD[String]]
queue ++= batches.map(batch => sc.parallelize(batch, numSlices = 2))
val inputDStream = ssc.queueStream(queue, oneAtATime = true)
var result : Result = ok
inputDStream.foreachRDD { rdd =>
/* AsResult { rdd should foreachRecord(_ == record) } fails because the closure argument
* of foreachRecord needs to access the local variable record, and to do that it tries to serialize
* the whole context, including result, for which it fails with:
Driver stacktrace:,org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost):
java.io.InvalidClassException: org.specs2.execute.Success; no valid constructor
*/
//val r = AsResult { rdd should foreachRecord(_ == record) }
/* ok because the closure argument of foreachRecord doens't need any value from the context */
//val r = AsResult { rdd should foreachRecord(_ == "hola") }
/* ok as we explicitly specify which values of the context are included in the context, following
http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/ */
val r = AsResult { rdd should foreachRecord(record)(r => { _ == r} ) }
println(s"r: ${r}")
/* ok because in a direct call to filter, spark closure cleaner is smart enough to avoid
* sending to many data to the closure
* */
val r2 = AsResult { rdd.filter(_ != record).count === 0 }
println(s"r2: ${r2}")
result = result and r
}
ssc.start()
StreamingContextUtils.awaitForNBatchesCompleted(batches.length)(ssc)
ssc.stop(stopSparkContext=false, stopGracefully=false)
println(s"result : ${result}")
result
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment