Skip to content

Instantly share code, notes, and snippets.

@chanwit
Created August 8, 2015 06:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chanwit/0260345539199c3c7d93 to your computer and use it in GitHub Desktop.
Save chanwit/0260345539199c3c7d93 to your computer and use it in GitHub Desktop.
package spark.example
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
object N12_Streaming {
def main(args: Array[String]) {
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap((s: String) => {s.split(" ")})
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment