Skip to content

Instantly share code, notes, and snippets.

@cbuntain
Created January 15, 2015 20:50
Show Gist options
  • Save cbuntain/f16359906f4a6c52544c to your computer and use it in GitHub Desktop.
Save cbuntain/f16359906f4a6c52544c to your computer and use it in GitHub Desktop.
Tweet Frequency in Spark
import scala.collection.JavaConverters._
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import java.text.SimpleDateFormat
import java.util.Calendar
import java.util.Date
import java.util.Locale
import java.lang.reflect.Type
import com.google.gson.Gson
import com.google.gson.reflect.TypeToken
object App {
val CREATED_AT_FORMAT = "EEE MMM d HH:mm:ss z yyyy"
val sdf = new SimpleDateFormat(CREATED_AT_FORMAT, Locale.US)
val gson = new Gson()
val mapType : Type = new TypeToken[java.util.HashMap[String,Object]]() {}.getType()
/**
* @param args the command line arguments
*/
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TwitterLearner")
val sc = new SparkContext(conf)
val twitterMsgs = sc.textFile(args(0))
val tweets = twitterMsgs.map(line => jsonToMap(line))
val tweetsFiltered = tweets.filter(jsonMap => jsonMap.contains("text") && jsonMap.contains("created_at"))
val timedTweets = tweetsFiltered.map(m => (mapToPair(m)))
val groupedTweets = timedTweets.groupByKey()
val minTime = groupedTweets.keys.min
val maxTime = groupedTweets.keys.max
val fullKeyList = constructDateList(minTime, maxTime)
val fullKeyRdd : RDD[Tuple2[Date, Iterable[Map[String, Object]]]] =
sc.parallelize(fullKeyList).map(key => (key, List()))
val withFullDates = groupedTweets.union(fullKeyRdd)
val mergedDates = withFullDates.reduceByKey((l, r) => l ++ r)
// Tells us the number messages per slice
val sliceCounts = mergedDates.mapValues(tweetSeq => tweetSeq.size).sortByKey()
sliceCounts.saveAsTextFile(args(1))
}
def jsonToMap(line : String) : Map[String, Object] = {
val m : java.util.HashMap[String,Object] = gson.fromJson(line, mapType)
return m.asScala.toMap
}
def mapToPair(m : Map[String, Object]) : Tuple2[Date,Map[String,Object]] = {
val createdStr = m("created_at").asInstanceOf[String]
return (convertTimeToSlice(sdf.parse(createdStr)), m)
}
def convertTimeToSlice(time : Date) : Date = {
val cal = Calendar.getInstance
cal.setTime(time)
cal.set(Calendar.SECOND, 0)
return cal.getTime
}
def constructDateList(startDate : Date, endDate : Date) : List[Date] = {
val cal = Calendar.getInstance
cal.setTime(startDate)
var l = List[Date]()
while(cal.getTime.before(endDate)) {
l = l :+ cal.getTime
cal.add(Calendar.MINUTE, 1)
}
l = l :+ endDate
return l
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment