Skip to content

Instantly share code, notes, and snippets.

@vinothchandar
Created July 26, 2015 02:36
Show Gist options
  • Save vinothchandar/705cd8c89af87f605bf7 to your computer and use it in GitHub Desktop.
Save vinothchandar/705cd8c89af87f605bf7 to your computer and use it in GitHub Desktop.
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import scala.collection.mutable
import scala.util.Random
/**
* DStream that just keeps generating random numbers as events
*/
class CountingDirectDStream(@transient ssc_ : StreamingContext, @transient startCount: Int) extends InputDStream[Int](ssc_) with Logging {
val MAX_EVENTS = 4;
var currentCount = startCount
val rand = new Random()
override def start(): Unit = {
}
override def stop(): Unit = {
}
override def compute(validTime: Time): Option[RDD[Int]] = {
println(s" Start of batch at time ${validTime}")
val nextInts = new mutable.MutableList[Int]()
val numEvents = rand.nextInt(MAX_EVENTS) + 1
(0 until numEvents).foreach(i => {
nextInts+=currentCount
currentCount+=1
})
val countRdd = ssc_.sparkContext.parallelize(nextInts, 1)
return Some(countRdd)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment