Skip to content

Instantly share code, notes, and snippets.

@cotdp
Created May 24, 2014 10:11
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cotdp/b471cfff183b59d65ae1 to your computer and use it in GitHub Desktop.
Save cotdp/b471cfff183b59d65ae1 to your computer and use it in GitHub Desktop.
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonAST._
object UserInterestsExample {
def main(args: Array[String]): Unit = {
val sc = new SparkContext("local[8]", "UserInterestsExample")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
// This is the shape data we want to run SQL against
case class UserInterest(name: String, location_x: Double, location_y: Double, likes: String)
// Produces a RDD[String]
val lines = sc.textFile("likes.json")
val user_interest = lines.map(line => {
// Parse the JSON, returns RDD[JValue]
parse(line)
}).map(json => {
// Extract the values we need to populate the UserInterest class
implicit lazy val formats = org.json4s.DefaultFormats
val name = (json \ "name").extract[String]
val location_x = (json \ "location" \ "x").extract[Double]
val location_y = (json \ "location" \ "y").extract[Double]
val likes = (json \ "likes").extract[Seq[String]].map(_.toLowerCase()).mkString(";")
( UserInterest(name, location_x, location_y, likes) )
})
// Now we register the RDD[UserInterest] as a Table
user_interest.registerAsTable("user_interest")
// Print out what the table contains
sql("SELECT * FROM user_interest").collect().foreach(println)
/* Output:
[Blythe,34.94953,-85.75692,sports;gardening;decorating]
[Leslie,34.98342,-86.61666,mystery;vitamin supplements;wildlife]
[Alexandria,30.37382,-96.97776,fishing;sweepstakes;home study courses]
[Harmony,34.41231,-85.51302,astrology;records;nascar]
...
*/
// Ideally we would use something like the Hive UDF ARRAY_CONTAINS, but it is not supported yet
// sql("SELECT name FROM user_interest WHERE ARRAY_CONTAINS(likes,'baking')").collect().foreach(println)
// Instead we're going to cheat using LIKE '%<interest>%' on the String
sql("SELECT * FROM user_interest WHERE likes LIKE '%baking%'").collect().foreach(println)
/* Output:
[Vickie,34.5862,-86.99362,history;health;baking]
[Jovita,33.05568,-87.44778,baking;baseball;casino gambling]
[Calandra,39.80144,-90.30762,environment;baking;medicare coverage]
*/
sc.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment