Skip to content

Instantly share code, notes, and snippets.

@juanpampliega
Created May 16, 2015 06:18
Show Gist options
  • Save juanpampliega/a16fe285c63c0342d40a to your computer and use it in GitHub Desktop.
Save juanpampliega/a16fe285c63c0342d40a to your computer and use it in GitHub Desktop.
Twitter Top Hashtags with Spark Streaming in spark-shell
import com.google.gson.Gson
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
val configs = new HashMap[String, String] ++= Seq(
"apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
println("Configuring Twitter OAuth")
configs.foreach{ case(key, value) =>
if (value.trim.isEmpty) {
throw new Exception("Error setting authentication - value for " + key + " not set")
}
val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
System.setProperty(fullKey, value.trim)
println("\tProperty " + fullKey + " set as [" + value.trim + "]")
}
println()
}
// Configure Twitter credentials
val apiKey = "xxx"
val apiSecret = "xxx"
val accessToken = "xxx"
val accessTokenSecret = "xxx"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
val ssc = new StreamingContext(sc, Seconds(15))
val stream = TwitterUtils.createStream(ssc, None)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
val topCounts120 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(120)).map{case (topic, count) => (count, topic)}.transform(_.sortByKey(false))
val topCounts30 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30)).map{case (topic, count) => (count, topic)}.transform(_.sortByKey(false))
// Print popular hashtags
topCounts120.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 120 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
topCounts30.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 30 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
ssc.start()
ssc.awaitTermination()
@guruprasad2509
Copy link

Hi, Thank you so much for the code . I gave my Twitter API Details and tried to executed the above code in Sparkshell. I am not able to see any Tweet coming in output console. I can see only Popular topics in last 120 seconds and Popular topics in last 130 seconds. Can you please help me? Thank you very much.

@singhalroopal
Copy link

i am not able to run this cmd 'import org.apache.spark.streaming.twitter.TwitterUtils' on spark shell it is giving error ' error: object twitter is not a member of package org.apache.spark.streaming'.how can i solve this error. is there any requirement like sbt,maven.

@vipulchandraker
Copy link

vipulchandraker commented Jul 14, 2019

i am not able to run this cmd 'import org.apache.spark.streaming.twitter.TwitterUtils' on spark shell it is giving error ' error: object twitter is not a member of package org.apache.spark.streaming'.how can i solve this error. is there any requirement like sbt,maven.

You need to add all the twitter streaming jars while launching spark-shell.

spark-shell --jars spark-streaming-twitter_2.11-2.3.3.jar,spark-tags_2.11-2.3.3.jar,twitter4j-core-4.0.6.jar,twitter4j-stream-4.0.6.jar

You can download those dependencies from here

@Marcela1396
Copy link

Marcela1396 commented Aug 13, 2020

Hola, me puedes ayudar. Deseo correr el script en mi aplicación de Apache Spark montada localmente pero tengo problemas, podrias ayudarme con los comandos que debo incluir para lograr el objetivo?
Lo que hago es iniciar un spark-shell con los archivos .jars y alli copiar el codigo pero obtengo lo siguiente

Exception in thread "streaming-start" java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class
Estoy trabajando en Linux. Gracias

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment