Skip to content

Instantly share code, notes, and snippets.

@atty303
Created March 4, 2016 01:42
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save atty303/18e64e718f0cf3261c0e to your computer and use it in GitHub Desktop.
Save atty303/18e64e718f0cf3261c0e to your computer and use it in GitHub Desktop.
package org.apache.spark.streaming
import org.apache.spark.util.ManualClock
/**
* A `Clock` whose time can be manually set and modified. Its reported time does not change
* as time elapses, but only as its time is modified by callers. This is mainly useful for
* testing.
*
* private[spark] である ManualClock を外から扱えるようにする proxy class.
*/
class ClockWrapper(ssc: StreamingContext) {
private val manualClock = ssc.scheduler.clock.asInstanceOf[ManualClock]
def getTimeMillis(): Long = manualClock.getTimeMillis()
def setTime(timeToSet: Long) = manualClock.setTime(timeToSet)
def advance(timeToAdd: Long) = manualClock.advance(timeToAdd)
def waitTillTime(targetTime: Long): Long = manualClock.waitTillTime(targetTime)
}
package com.adtdp.tenma.shared.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.specs2.main.ArgumentsShortcuts
import org.specs2.specification.BeforeAfterAll
import org.specs2.specification.dsl.mutable.ArgumentsCreation
trait SparkSpec extends BeforeAfterAll with ArgumentsShortcuts { this: ArgumentsCreation =>
sequential
private var _sc: SparkContext = _
def sc: SparkContext = {
assert(_sc != null, "SparkContext is not initialized yet. Please use it in fragment only.")
_sc
}
val master = "local[4]"
val appName = this.getClass.getSimpleName
lazy val sparkConf: SparkConf = new SparkConf()
.setMaster(master)
.setAppName(appName)
.set("spark.sql.shuffle.partitions", "4")
override def beforeAll(): Unit = {
_sc = new SparkContext(sparkConf)
}
override def afterAll(): Unit = {
if (_sc != null) {
_sc.stop()
_sc = null
}
}
}
package com.adtdp.tenma.shared.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{ClockWrapper, Duration, StreamingContext}
import org.specs2.specification.BeforeAfterEach
import org.specs2.specification.dsl.mutable.ArgumentsCreation
import scala.collection.mutable
import scala.reflect.ClassTag
trait SparkStreamingSpec extends SparkSpec with BeforeAfterEach { this: ArgumentsCreation =>
private var _ssc: StreamingContext = _
def ssc: StreamingContext = _ssc
private var _clock: ClockWrapper = _
def clock: ClockWrapper = _clock
val batchDuration: Duration
sparkConf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
override def before: Unit = {
_ssc = new StreamingContext(sc, batchDuration)
_clock = new ClockWrapper(_ssc)
}
override def after: Unit = {
if (_ssc != null) {
_ssc.stop(stopSparkContext = false, stopGracefully = false)
_ssc = null
}
}
def advance(n: Int = 1): Unit = clock.advance(batchDuration.milliseconds * n)
def startQueueStream[IN : ClassTag, OUT : ClassTag](fn: (DStream[IN]) => DStream[OUT]): (mutable.Queue[RDD[IN]], mutable.Queue[IndexedSeq[OUT]]) = {
val sourceQueue = mutable.Queue.empty[RDD[IN]]
val resultQueue = mutable.Queue.empty[IndexedSeq[OUT]]
fn(ssc.queueStream(sourceQueue)).foreachRDD { rdd =>
resultQueue += rdd.collect().toIndexedSeq
}
ssc.start()
(sourceQueue, resultQueue)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment