Skip to content

Instantly share code, notes, and snippets.

@stdatalabs
Last active October 24, 2016 07:46
Show Gist options
  • Save stdatalabs/639d4d4fc7c213c97a3484a74ddc34fe to your computer and use it in GitHub Desktop.
Save stdatalabs/639d4d4fc7c213c97a3484a74ddc34fe to your computer and use it in GitHub Desktop.
Spark Streaming - Flume Avro Sink - PopularHashTags. More @ stdatalabs.blogspot.com
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
/**
* A Spark Streaming - Flume integration to find Popular hashtags from twitter
* It receives events from a Flume source that connects to twitter and pushes
* tweets as avro events to sink.
*
* More discussion at stdatalabs.blogspot.com
*
* @author Sachin Thirumala
*/
object FlumeSparkPopularHashTags {
val conf = new SparkConf().setMaster("local[6]").setAppName("Spark Streaming - Flume Source - PopularHashTags")
val sc = new SparkContext(conf)
def main(args: Array[String]) {
sc.setLogLevel("WARN")
// Set the Spark StreamingContext to create a DStream for every 5 seconds
val ssc = new StreamingContext(sc, Seconds(5))
val filter = args.takeRight(args.length)
// Create stream using FlumeUtils to receive data from flume at hostname: quickstart.cloudera and port: 9988
val stream = FlumeUtils.createStream(ssc, "ubuntu", 9988)
val tweets = stream.map(e => new String(e.event.getBody.array))
tweets.print()
// Split the stream on space and extract hashtags
val hashTags = tweets.flatMap(status => status.split(" ").filter(_.startsWith("#")))
// Get the top hashtags over the previous 60 sec window
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map { case (topic, count) => (count, topic) }
.transform(_.sortByKey(false))
// Get the top hashtags over the previous 10 sec window
val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
.map { case (topic, count) => (count, topic) }
.transform(_.sortByKey(false))
// Print popular hashtags
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
topCounts10.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
stream.count().map(cnt => "Received " + cnt + " flume events.").print()
ssc.start()
ssc.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment