Skip to content

Instantly share code, notes, and snippets.

@microamp
Created April 24, 2016 03:33
Show Gist options
  • Save microamp/b67c136f6878fefeb47ea8462bfd2b96 to your computer and use it in GitHub Desktop.
Save microamp/b67c136f6878fefeb47ea8462bfd2b96 to your computer and use it in GitHub Desktop.
Same as StatefulNetworkWordCount but with `mapWithState`
import org.apache.spark._
import org.apache.spark.streaming._
object StatefulNetworkWordCount2 {
def updateFunction2(word: String, one: Option[Int], state: State[Int]) = {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
def main(args: Array[String]) {
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[8]").setAppName("StatefulNetworkWordCount2")
val ssc = new StreamingContext(conf, Seconds(5))
// Set checkpoint directory
ssc.checkpoint(".")
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
// Initial state RDD for mapWithState operation
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 0)))
// Update state using `mapWithState`
val stateDStream = pairs.mapWithState(
StateSpec.function(updateFunction2 _).initialState(initialRDD)
)
// Print the first ten elements of each RDD generated in this DStream to the console
stateDStream.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment