Skip to content

Instantly share code, notes, and snippets.

@Attsun1031
Created February 15, 2015 14:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Attsun1031/f3ad87894dc76f89e168 to your computer and use it in GitHub Desktop.
Save Attsun1031/f3ad87894dc76f89e168 to your computer and use it in GitHub Desktop.
spark-summit 2014 spark streaming hands on
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._
object Tutorial {
def main(args: Array[String]) {
// Checkpoint directory
val checkpointDir = TutorialHelper.getCheckpointDirectory()
// Configure Twitter credentials
val apiKey = "xxx"
val apiSecret = "xxx"
val accessToken = "xxx"
val accessTokenSecret = "xxx"
TutorialHelper.configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
// Your code goes here
val ssc = new StreamingContext(new SparkConf(), Seconds(1))
val tweets = TwitterUtils.createStream(ssc, None)
val statuses = tweets.map(status => status.getText())
// hashtags
val words = statuses.flatMap(status => status.split(" "))
val hashtags = words.filter(word => word.startsWith("#"))
// 5 min window hashtags
val counts = hashtags.map(tag => (tag, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 * 5), Seconds(1))
// top 10 hashtags in 5 min
val sortedCounts = counts.map { case(tag, count) => (count, tag) }.transform(rdd => rdd.sortByKey(false))
sortedCounts.foreach(rdd => println("\nTop 10 hashtags:\n" + rdd.take(10).mkString("\n")))
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment