Created
August 24, 2015 12:24
-
-
Save juanrh/dffd060e3a371676b83c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package es.ucm.fdi.sscheck.testing | |
import org.junit.runner.RunWith | |
import org.specs2.runner.JUnitRunner | |
import org.specs2.execute.{AsResult,Result} | |
import org.specs2.matcher.Matcher | |
import org.specs2.matcher.MatchersImplicits._ | |
import org.apache.spark._ | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.streaming.{StreamingContext,Duration} | |
import es.ucm.fdi.sscheck.spark.streaming.{SharedStreamingContextBeforeAfterEach,StreamingContextUtils} | |
import scala.collection.mutable.Queue | |
object RDDMatchers { | |
def foreachRecord[T](predicate : T => Boolean) : Matcher[RDD[T]] = { (rdd : RDD[T]) => | |
val failingRecords = rdd.filter(! predicate(_)) | |
( | |
failingRecords.isEmpty, | |
"each record fulfils the predicate", | |
s"predicate failed for records ${failingRecords.take(4).mkString(", ")} ..." | |
) | |
} | |
def foreachRecord[T,C](predicateContext : C)(toPredicate : C => (T => Boolean)) : Matcher[RDD[T]] = { | |
val predicate = toPredicate(predicateContext) | |
foreachRecord(predicate) | |
} | |
} | |
@RunWith(classOf[JUnitRunner]) | |
class SerialProblemTest | |
extends org.specs2.Specification | |
//with org.specs2.matcher.MustThrownExpectations | |
with org.specs2.specification.BeforeAfterAll | |
with Serializable { | |
import RDDMatchers._ | |
@transient var sc : SparkContext = _ | |
override def beforeAll : Unit = { | |
val sparkMaster = "local[4]" | |
val conf = new SparkConf().setMaster(sparkMaster).setAppName("SerialProblem") | |
println("Starting Spark context") | |
sc = new SparkContext(conf) | |
} | |
override def afterAll : Unit = { | |
println("Stopping Spark context") | |
sc.stop() | |
} | |
def is = sequential ^ s2""" | |
SerialProblemTest | |
- simpleBatchTest $simpleBatchTest | |
- simpleStreamingTest $simpleStreamingTest | |
""" | |
def simpleBatchTest = { | |
val rdd = sc.parallelize(1 to 100, 3) | |
rdd should foreachRecord(_ > 0) | |
} | |
def simpleStreamingTest : Result = { | |
val ssc = new StreamingContext(sc, Duration(300)) | |
val record = "hola" | |
val batches = Seq.fill(5)(Seq.fill(10)(record)) | |
val queue = new Queue[RDD[String]] | |
queue ++= batches.map(batch => sc.parallelize(batch, numSlices = 2)) | |
val inputDStream = ssc.queueStream(queue, oneAtATime = true) | |
var result : Result = ok | |
inputDStream.foreachRDD { rdd => | |
/* AsResult { rdd should foreachRecord(_ == record) } fails because the closure argument | |
* of foreachRecord needs to access the local variable record, and to do that it tries to serialize | |
* the whole context, including result, for which it fails with: | |
Driver stacktrace:,org.apache.spark.SparkException: Job aborted due to stage failure: | |
Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): | |
java.io.InvalidClassException: org.specs2.execute.Success; no valid constructor | |
*/ | |
//val r = AsResult { rdd should foreachRecord(_ == record) } | |
/* ok because the closure argument of foreachRecord doens't need any value from the context */ | |
//val r = AsResult { rdd should foreachRecord(_ == "hola") } | |
/* ok as we explicitly specify which values of the context are included in the context, following | |
http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/ */ | |
val r = AsResult { rdd should foreachRecord(record)(r => { _ == r} ) } | |
println(s"r: ${r}") | |
/* ok because in a direct call to filter, spark closure cleaner is smart enough to avoid | |
* sending to many data to the closure | |
* */ | |
val r2 = AsResult { rdd.filter(_ != record).count === 0 } | |
println(s"r2: ${r2}") | |
result = result and r | |
} | |
ssc.start() | |
StreamingContextUtils.awaitForNBatchesCompleted(batches.length)(ssc) | |
ssc.stop(stopSparkContext=false, stopGracefully=false) | |
println(s"result : ${result}") | |
result | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment