Skip to content

Instantly share code, notes, and snippets.

@Arnold1
Forked from samklr/TweetStreams.scala
Created November 11, 2017 03:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Arnold1/cc363dc32cf6ba34cafefa35443ccf57 to your computer and use it in GitHub Desktop.
Save Arnold1/cc363dc32cf6ba34cafefa35443ccf57 to your computer and use it in GitHub Desktop.
TweetStream with Spark
import Utils
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkContext}
object TweetStreams {
def main(args: Array[String]) {
// oAuth
Utils.configureTwitterCredentials("src/main/twitter.txt")
Utils.setStreamingLogLevels
// Initialize Spark Context
println("Initiallizing Spark Context")
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
val sc = new SparkContext("local[2]", this.getClass.getSimpleName)
val ssc = new StreamingContext(sc, Seconds(5))
// TwitterUtils can accept a filter argument for filtering tweet content
val filters = Array("")
val stream = TwitterUtils.createStream(ssc, Utils.getAuth, filters)
val status = stream.map(_.getText)
status.foreachRDD(rdd => rdd.take(10).foreach(println))
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map{case (topic, count) => (count, topic)}
.transform(_.sortByKey(false))
val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
.map{case (topic, count) => (count, topic)}
.transform(_.sortByKey(false))
// Popular hashtags
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
topCounts10.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
ssc.start()
ssc.awaitTermination()
}
}
* Helper object for authenticating to the Twitter API
*
*/
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark.Logging
import org.apache.log4j.{Level, Logger}
object Utils {
/** OAuth Credentials for accessing Twitter */
def configureTwitterCredentials(path: String) {
val file = new File(path)
if (!file.exists) {
throw new Exception("Where is the damn Config file " + file)
}
val lines = Source.fromFile(file.toString).getLines.filter(_.trim.size > 0).toSeq
val pairs = lines.map(line => {
val splits = line.split("=")
if (splits.size != 2) {
throw new Exception("Error parsing configuration file - incorrectly formatted line [" + line + "]")
}
(splits(0).trim(), splits(1).trim())
})
pairs.foreach(x => {
System.setProperty("twitter4j.oauth."+ x._1, x._2)
})
Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
}
def getAuth = {
Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment