Skip to content

Instantly share code, notes, and snippets.

@JSantosP
Created April 5, 2016 14:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JSantosP/38883bf8efbb15b6f4b1eec972c64b1d to your computer and use it in GitHub Desktop.
Save JSantosP/38883bf8efbb15b6f4b1eec972c64b1d to your computer and use it in GitHub Desktop.
New Boot for twitter-stream project. With this snippet we're able to measure NRT the number of mentions between Android and IOS
package scalera.twitter
import org.apache.spark.streaming.dstream.DStream
import twitter4j.Status
object Boot extends Analytics{
// Set checkpoint dir
ssc.checkpoint("/tmp")
// Add filters ...
val Android = "android"
val IOS = "ios"
filter(
Android,
IOS
)
// We group by key
val groupedTweets = stream.flatMap{content =>
List(Android, IOS).flatMap(key =>
if (content.getText.contains(key)) Option(key -> content)
else None)
}
// We apply the aggregation state function
val aggregatedTweets: DStream[(String,Long)] =
groupedTweets.updateStateByKey{
(newTweets, previousState) =>
val newTweetsAmount = newTweets.size.toLong
previousState.fold(Some(newTweetsAmount))(previousSize =>
Some(previousSize + newTweetsAmount))
}
// And add actions to perform (like printing the aggregatedTweets) ...
aggregatedTweets.foreachRDD{ results =>
results.foreach{
case (team, amount) => logger.info(s">>>>>>>>>>>>>>>> $team : $amount")
}
}
// ... and begin listening
listen()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment