Skip to content

Instantly share code, notes, and snippets.

@rsds143
Created March 31, 2016 15:11
Show Gist options
  • Save rsds143/773b059c32b7e1c46664abf348b9a11d to your computer and use it in GitHub Desktop.
Save rsds143/773b059c32b7e1c46664abf348b9a11d to your computer and use it in GitHub Desktop.
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
object MapWithState {
val batchCountKey = "batchCount"
def main(args: Array[String]) = {
var master = "spark://127.0.0.1:7077"
if (args.length != 0) {
master = args.apply(0)
}
val conf = new SparkConf().setMaster(master)
val sc = new SparkContext(conf)
val initialRDD = sc.parallelize(List(("dummy", 0L)))
val stateSpec = StateSpec.function(trackStateFunc _)
.initialState(initialRDD)
.numPartitions(2)
.timeout(Seconds(60))
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("file:///tmp/checkpointdir")
// Create a stream that generates 1000 lines per second
val stream = ssc.receiverStream(new DummyReceiver(10))
// Split the lines into words, and create a paired (key-value) dstream
val wordStream = stream.flatMap { _.split(" ") }.map(word => (word, 1))
// This represents the emitted stream from the trackStateFunc. Since we emit every input record with the updated value,
// this stream will contain the same # of records as the input dstream.
val wordCountStateStream = wordStream.mapWithState(stateSpec)
wordCountStateStream.filter(k=>k._1=="dummy").print()
ssc.start()
ssc.awaitTermination()
}
def trackStateFunc(batchTime: Time, key: String, value: Option[Int], state: State[Long]): Option[(String, Long)] = {
val sum = value.getOrElse(0).
toLong + state.getOption.getOrElse(0L)
state.exists()
val output = (key, sum)
state.update(sum)
Some(output)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment