Created
March 31, 2016 15:11
-
-
Save rsds143/773b059c32b7e1c46664abf348b9a11d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.{SparkConf, SparkContext} | |
import org.apache.spark._ | |
import org.apache.spark.storage._ | |
import org.apache.spark.streaming._ | |
object MapWithState { | |
val batchCountKey = "batchCount" | |
def main(args: Array[String]) = { | |
var master = "spark://127.0.0.1:7077" | |
if (args.length != 0) { | |
master = args.apply(0) | |
} | |
val conf = new SparkConf().setMaster(master) | |
val sc = new SparkContext(conf) | |
val initialRDD = sc.parallelize(List(("dummy", 0L))) | |
val stateSpec = StateSpec.function(trackStateFunc _) | |
.initialState(initialRDD) | |
.numPartitions(2) | |
.timeout(Seconds(60)) | |
val ssc = new StreamingContext(sc, Seconds(1)) | |
ssc.checkpoint("file:///tmp/checkpointdir") | |
// Create a stream that generates 1000 lines per second | |
val stream = ssc.receiverStream(new DummyReceiver(10)) | |
// Split the lines into words, and create a paired (key-value) dstream | |
val wordStream = stream.flatMap { _.split(" ") }.map(word => (word, 1)) | |
// This represents the emitted stream from the trackStateFunc. Since we emit every input record with the updated value, | |
// this stream will contain the same # of records as the input dstream. | |
val wordCountStateStream = wordStream.mapWithState(stateSpec) | |
wordCountStateStream.filter(k=>k._1=="dummy").print() | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
def trackStateFunc(batchTime: Time, key: String, value: Option[Int], state: State[Long]): Option[(String, Long)] = { | |
val sum = value.getOrElse(0). | |
toLong + state.getOption.getOrElse(0L) | |
state.exists() | |
val output = (key, sum) | |
state.update(sum) | |
Some(output) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment