-
-
Save Arnold1/cc363dc32cf6ba34cafefa35443ccf57 to your computer and use it in GitHub Desktop.
TweetStream with Spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Utils | |
import org.apache.spark.streaming.StreamingContext._ | |
import org.apache.spark.streaming.{Seconds, StreamingContext} | |
import StreamingContext._ | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.streaming.twitter._ | |
import org.apache.spark.SparkConf | |
import org.apache.spark.{SparkConf, SparkContext} | |
object TweetStreams { | |
def main(args: Array[String]) { | |
// oAuth | |
Utils.configureTwitterCredentials("src/main/twitter.txt") | |
Utils.setStreamingLogLevels | |
// Initialize Spark Context | |
println("Initiallizing Spark Context") | |
val conf = new SparkConf().setAppName(this.getClass.getSimpleName) | |
val sc = new SparkContext("local[2]", this.getClass.getSimpleName) | |
val ssc = new StreamingContext(sc, Seconds(5)) | |
// TwitterUtils can accept a filter argument for filtering tweet content | |
val filters = Array("") | |
val stream = TwitterUtils.createStream(ssc, Utils.getAuth, filters) | |
val status = stream.map(_.getText) | |
status.foreachRDD(rdd => rdd.take(10).foreach(println)) | |
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) | |
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) | |
.map{case (topic, count) => (count, topic)} | |
.transform(_.sortByKey(false)) | |
val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) | |
.map{case (topic, count) => (count, topic)} | |
.transform(_.sortByKey(false)) | |
// Popular hashtags | |
topCounts60.foreachRDD(rdd => { | |
val topList = rdd.take(10) | |
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) | |
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} | |
}) | |
topCounts10.foreachRDD(rdd => { | |
val topList = rdd.take(10) | |
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) | |
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} | |
}) | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
* Helper object for authenticating to the Twitter API | |
* | |
*/ | |
import org.apache.spark.streaming._ | |
import org.apache.spark.storage.StorageLevel | |
import scala.io.Source | |
import scala.collection.mutable.HashMap | |
import java.io.File | |
import org.apache.log4j.Logger | |
import org.apache.log4j.Level | |
import sys.process.stringSeqToProcess | |
import twitter4j.auth.OAuthAuthorization | |
import twitter4j.conf.ConfigurationBuilder | |
import org.apache.spark.Logging | |
import org.apache.log4j.{Level, Logger} | |
object Utils { | |
/** OAuth Credentials for accessing Twitter */ | |
def configureTwitterCredentials(path: String) { | |
val file = new File(path) | |
if (!file.exists) { | |
throw new Exception("Where is the damn Config file " + file) | |
} | |
val lines = Source.fromFile(file.toString).getLines.filter(_.trim.size > 0).toSeq | |
val pairs = lines.map(line => { | |
val splits = line.split("=") | |
if (splits.size != 2) { | |
throw new Exception("Error parsing configuration file - incorrectly formatted line [" + line + "]") | |
} | |
(splits(0).trim(), splits(1).trim()) | |
}) | |
pairs.foreach(x => { | |
System.setProperty("twitter4j.oauth."+ x._1, x._2) | |
}) | |
Some(new OAuthAuthorization(new ConfigurationBuilder().build())) | |
} | |
def getAuth = { | |
Some(new OAuthAuthorization(new ConfigurationBuilder().build())) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment