Skip to content

Instantly share code, notes, and snippets.

@chanwit
Created August 8, 2015 08:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chanwit/6a942182b0b7a7adf32d to your computer and use it in GitHub Desktop.
Save chanwit/6a942182b0b7a7adf32d to your computer and use it in GitHub Desktop.
package spark.example
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object N13_KafkaStreaming {
def main(args: Array[String]) {
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
val ssc = new StreamingContext(sc, Seconds(1))
val zkQuorum = "128.199.228.30"
val lines = KafkaUtils.createStream(ssc,
zkQuorum,
"128.199.228.30", Map("test"->1)).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment