Created
March 4, 2016 01:42
-
-
Save atty303/18e64e718f0cf3261c0e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.streaming | |
import org.apache.spark.util.ManualClock | |
/** | |
* A `Clock` whose time can be manually set and modified. Its reported time does not change | |
* as time elapses, but only as its time is modified by callers. This is mainly useful for | |
* testing. | |
* | |
* private[spark] である ManualClock を外から扱えるようにする proxy class. | |
*/ | |
class ClockWrapper(ssc: StreamingContext) { | |
private val manualClock = ssc.scheduler.clock.asInstanceOf[ManualClock] | |
def getTimeMillis(): Long = manualClock.getTimeMillis() | |
def setTime(timeToSet: Long) = manualClock.setTime(timeToSet) | |
def advance(timeToAdd: Long) = manualClock.advance(timeToAdd) | |
def waitTillTime(targetTime: Long): Long = manualClock.waitTillTime(targetTime) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.adtdp.tenma.shared.spark | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.specs2.main.ArgumentsShortcuts | |
import org.specs2.specification.BeforeAfterAll | |
import org.specs2.specification.dsl.mutable.ArgumentsCreation | |
trait SparkSpec extends BeforeAfterAll with ArgumentsShortcuts { this: ArgumentsCreation => | |
sequential | |
private var _sc: SparkContext = _ | |
def sc: SparkContext = { | |
assert(_sc != null, "SparkContext is not initialized yet. Please use it in fragment only.") | |
_sc | |
} | |
val master = "local[4]" | |
val appName = this.getClass.getSimpleName | |
lazy val sparkConf: SparkConf = new SparkConf() | |
.setMaster(master) | |
.setAppName(appName) | |
.set("spark.sql.shuffle.partitions", "4") | |
override def beforeAll(): Unit = { | |
_sc = new SparkContext(sparkConf) | |
} | |
override def afterAll(): Unit = { | |
if (_sc != null) { | |
_sc.stop() | |
_sc = null | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.adtdp.tenma.shared.spark | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.streaming.dstream.DStream | |
import org.apache.spark.streaming.{ClockWrapper, Duration, StreamingContext} | |
import org.specs2.specification.BeforeAfterEach | |
import org.specs2.specification.dsl.mutable.ArgumentsCreation | |
import scala.collection.mutable | |
import scala.reflect.ClassTag | |
trait SparkStreamingSpec extends SparkSpec with BeforeAfterEach { this: ArgumentsCreation => | |
private var _ssc: StreamingContext = _ | |
def ssc: StreamingContext = _ssc | |
private var _clock: ClockWrapper = _ | |
def clock: ClockWrapper = _clock | |
val batchDuration: Duration | |
sparkConf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") | |
override def before: Unit = { | |
_ssc = new StreamingContext(sc, batchDuration) | |
_clock = new ClockWrapper(_ssc) | |
} | |
override def after: Unit = { | |
if (_ssc != null) { | |
_ssc.stop(stopSparkContext = false, stopGracefully = false) | |
_ssc = null | |
} | |
} | |
def advance(n: Int = 1): Unit = clock.advance(batchDuration.milliseconds * n) | |
def startQueueStream[IN : ClassTag, OUT : ClassTag](fn: (DStream[IN]) => DStream[OUT]): (mutable.Queue[RDD[IN]], mutable.Queue[IndexedSeq[OUT]]) = { | |
val sourceQueue = mutable.Queue.empty[RDD[IN]] | |
val resultQueue = mutable.Queue.empty[IndexedSeq[OUT]] | |
fn(ssc.queueStream(sourceQueue)).foreachRDD { rdd => | |
resultQueue += rdd.collect().toIndexedSeq | |
} | |
ssc.start() | |
(sourceQueue, resultQueue) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment