Skip to content

Instantly share code, notes, and snippets.

@billmetangmo
Created February 5, 2018 12:27
Show Gist options
  • Save billmetangmo/e9b01963454acab4b6fb05d25f54369c to your computer and use it in GitHub Desktop.
Save billmetangmo/e9b01963454acab4b6fb05d25f54369c to your computer and use it in GitHub Desktop.
Simple PageRank code for spark-shell using Amazon Movies & TV reviews as data source
import org.apache.spark._
import org.apache.spark.graphx._
import java.io.File
import java.time._
import java.util.Calendar
import scala.util.MurmurHash
import java.io.PrintWriter
import org.apache.spark.rdd.RDD
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
## Download the dataset here: http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Movies_and_TV_5.json.gz
val movies_ratings = sqlContext.read.json("/in/reviews_Movies_and_TV_5.json")
val edges=movies_ratings.select($"reviewerID",$"asin").groupBy($"reviewerID").agg(collect_set($"asin")).select(concat_ws(",",$"collect_set(asin)")).rdd.map( x => x.getString(0).split(",").map(_.trim).toList).flatMap( x => x.combinations(2).toList).map( x => (x(0),x(1)))
val edgesRDD: RDD[(VertexId, VertexId )] = edges.map { case (a, b) => (MurmurHash.stringHash(a).toLong, MurmurHash.stringHash(b).toLong) }
val graph = Graph.fromEdgeTuples(edgesRDD, 1)
graph.cache
println("<<<<<<<< Start Page Rank >>>>>>>>>")
println(Calendar.getInstance.getTime)
val ranks = graph.pageRank(0.0001).vertices
ranks.cache
println("<<<<<<<< Stop Page Rank >>>>>>>>>")
println("<<<<<<<< Start Rank >>>>>>>>>")
val asins = movies_ratings.select($"asin").rdd.distinct.map(x=>x.getString(0)).map( a => (MurmurHash.stringHash(a).toLong,a))
asins.cache
ranks.join(asins).sortBy(_._2._1, ascending=false).take(10).foreach(x => println(x._2._2))
println("<<<<<<<< Stop Rank >>>>>>>>>")
def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) { val p = new java.io.PrintWriter(f);try { op(p) } finally { p.close() } }
val top10=ranks.join(asins).sortBy(_._2._1, ascending=false).take(10).map(x => "https://www.amazon.com/dp/"+x._2._2)
printToFile(new File("/out/output-" + LocalDateTime.now().toString.replaceAll(":", "-"))) { p => top10.foreach(p.println)}
println("<<<<<<<<< Program executed >>>>>>>>>")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment