Created
February 15, 2015 14:53
-
-
Save Attsun1031/f3ad87894dc76f89e168 to your computer and use it in GitHub Desktop.
spark-summit 2014 spark streaming hands on
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark._ | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.streaming._ | |
import org.apache.spark.streaming.twitter._ | |
import org.apache.spark.streaming.StreamingContext._ | |
import TutorialHelper._ | |
object Tutorial { | |
def main(args: Array[String]) { | |
// Checkpoint directory | |
val checkpointDir = TutorialHelper.getCheckpointDirectory() | |
// Configure Twitter credentials | |
val apiKey = "xxx" | |
val apiSecret = "xxx" | |
val accessToken = "xxx" | |
val accessTokenSecret = "xxx" | |
TutorialHelper.configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret) | |
// Your code goes here | |
val ssc = new StreamingContext(new SparkConf(), Seconds(1)) | |
val tweets = TwitterUtils.createStream(ssc, None) | |
val statuses = tweets.map(status => status.getText()) | |
// hashtags | |
val words = statuses.flatMap(status => status.split(" ")) | |
val hashtags = words.filter(word => word.startsWith("#")) | |
// 5 min window hashtags | |
val counts = hashtags.map(tag => (tag, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 * 5), Seconds(1)) | |
// top 10 hashtags in 5 min | |
val sortedCounts = counts.map { case(tag, count) => (count, tag) }.transform(rdd => rdd.sortByKey(false)) | |
sortedCounts.foreach(rdd => println("\nTop 10 hashtags:\n" + rdd.take(10).mkString("\n"))) | |
ssc.checkpoint(checkpointDir) | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment