Skip to content

Instantly share code, notes, and snippets.

@cotdp
Last active February 25, 2016 09:01
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save cotdp/b5b8155bb85e254d2a3c to your computer and use it in GitHub Desktop.
Save cotdp/b5b8155bb85e254d2a3c to your computer and use it in GitHub Desktop.
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonAST._
import org.json4s.DefaultFormats
object CandyCrushSQL {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local[8]", "CandyCrushSQL")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
implicit lazy val formats = DefaultFormats
// We want to extract the level number from "Yay, I completed level 576 in Candy Crush Saga!"
// the actual text will change based on the users language but parsing the 'last number' works
val pattern = """(\d+)""".r
// This is the shape data we want to run SQL against
case class CandyCrushInteraction(id: String, user: String, level: Int, gender: String, language: String)
// Produces a RDD[String]
val lines = sc.textFile("facebook-2014-05-19.json")
// Process the messages
val interactions = lines.map(line => {
// Parse the JSON, returns RDD[JValue]
parse(line)
}).filter(json => {
// Filter out only 'Candy Crush Saga' Facebook App activity
(json \ "facebook" \ "application").extract[String] == "Candy Crush Saga"
}).map(json => {
// Extract fields we want, we use compact() because they may not exist
val id = compact(json \ "facebook" \ "id")
val user = compact(json \ "facebook" \ "author" \ "hash")
val gender = compact(json \ "demographic" \ "gender")
val language = compact(json \ "language" \ "tag")
// Extract the 'level' using a RegEx or default to zero
var level = 0;
pattern.findAllIn( compact(json \ "interaction" \ "title") ).matchData.foreach(m => {
level = m.group(1).toInt
})
// Return an RDD[CandyCrushInteraction]
( CandyCrushInteraction(id, user, level, gender, language) )
})
// Now we register the RDD[CandyCrushInteraction] as a Table
interactions.registerAsTable("candy_crush_interaction")
// Game level by Gender
sql("SELECT gender, COUNT(level), MAX(level), MIN(level), AVG(level) FROM candy_crush_interaction WHERE level > 0 GROUP BY gender").collect().foreach(println)
/* Returns:
["male",14727,590,1,104.71705031574659]
["female",15422,590,1,114.17202697445208]
["mostly_male",2824,590,1,97.08852691218131]
["mostly_female",1934,590,1,99.0517063081696]
["unisex",2674,590,1,113.42071802543006]
[,11023,590,1,93.45677220357435]
*/
// Game level by Language
sql("SELECT language, COUNT(level), MAX(level), MIN(level), AVG(level) FROM candy_crush_interaction WHERE level > 0 GROUP BY language").collect().foreach(println)
/* Returns:
[,214,590,1,117.78971962616822]
["de",819,590,1,137.8229548229548]
["en",24256,590,1,91.48932222955145]
["es",6833,590,1,109.4412410361481]
["fr",5188,590,1,137.47764070932922]
*/
sc.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment