Skip to content

Instantly share code, notes, and snippets.

@juanpampliega
Last active August 29, 2015 14:21
Show Gist options
  • Save juanpampliega/991f3f09b65b836a22d9 to your computer and use it in GitHub Desktop.
Save juanpampliega/991f3f09b65b836a22d9 to your computer and use it in GitHub Desktop.
Code for running Twitter sentiment analysis with Spark Streaming in spark-shell
import com.google.gson.Gson
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
val configs = new HashMap[String, String] ++= Seq(
"apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
println("Configuring Twitter OAuth")
configs.foreach{ case(key, value) =>
if (value.trim.isEmpty) {
throw new Exception("Error setting authentication - value for " + key + " not set")
}
val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
System.setProperty(fullKey, value.trim)
println("\tProperty " + fullKey + " set as [" + value.trim + "]")
}
println()
}
// Configure Twitter credentials
val apiKey = "xxx"
val apiSecret = "xxx"
val accessToken = "xxxx"
val accessTokenSecret = "xxx"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
val ssc = new StreamingContext(sc, Seconds(15))
val stream = TwitterUtils.createStream(ssc, None)
def sentiment(s:String) : String = {
val positive = Array("like", "love", "good", "great", "happy", "cool", "the", "one", "that")
val negative = Array("hate", "bad", "stupid", "is")
var st = 0;
val words = s.split(" ")
positive.foreach(p =>
words.foreach(w =>
if(p==w) st = st+1
)
)
negative.foreach(p=>
words.foreach(w=>
if(p==w) st = st-1
)
)
if(st>0)
"positive"
else if(st<0)
"negative"
else
"neutral"
}
sqlContext.udf.register("sentiment", sentiment _)
val ssc = new StreamingContext(sc, Seconds(20))
val stream = TwitterUtils.createStream(ssc, None)
case class Tweet(createdAt:Long, text:String)
val twts = stream.window(Seconds(60)).map(status=>
Tweet(status.getCreatedAt().getTime()/1000, status.getText())
)
twts.foreachRDD(rdd => {
rdd.toDF().registerTempTable("tweets")
println("\nSentiments in the last 60 seconds (%s tweets total):".format(rdd.count()))
val sentimentsCount = sqlContext.sql("select sentiment(text), count(1) from tweets group by sentiment(text)")
sentimentsCount.foreach{t => println("%s --- %s".format(t(0), t(1)))}
})
ssc.start()
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment