Created
April 24, 2016 03:31
-
-
Save microamp/c7cb8ac61a48335f921a39aa8d9cacc3 to your computer and use it in GitHub Desktop.
The simplest Spark Streaming example (state updated using `updateStateByKey`)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark._ | |
import org.apache.spark.streaming._ | |
object StatefulNetworkWordCount { | |
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { | |
val newCount = runningCount.getOrElse(0) + newValues.sum | |
Some(newCount) | |
} | |
def main(args: Array[String]) { | |
// Create a local StreamingContext with two working thread and batch interval of 1 second. | |
// The master requires 2 cores to prevent from a starvation scenario. | |
val conf = new SparkConf().setMaster("local[8]").setAppName("StatefulNetworkWordCount") | |
val ssc = new StreamingContext(conf, Seconds(5)) | |
// Set checkpoint directory | |
ssc.checkpoint(".") | |
// Create a DStream that will connect to hostname:port, like localhost:9999 | |
val lines = ssc.socketTextStream("localhost", 9999) | |
// Split each line into words | |
val words = lines.flatMap(_.split(" ")) | |
// Count each word in each batch | |
val pairs = words.map(word => (word, 1)) | |
// Update state using `updateStateByKey` | |
val runningCounts = pairs.updateStateByKey[Int](updateFunction _) | |
// Print the first ten elements of each RDD generated in this DStream to the console | |
runningCounts.print() | |
ssc.start() // Start the computation | |
ssc.awaitTermination() // Wait for the computation to terminate | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment