Created
February 5, 2018 12:27
-
-
Save billmetangmo/e9b01963454acab4b6fb05d25f54369c to your computer and use it in GitHub Desktop.
Simple PageRank code for spark-shell using Amazon Movies & TV reviews as data source
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark._ | |
import org.apache.spark.graphx._ | |
import java.io.File | |
import java.time._ | |
import java.util.Calendar | |
import scala.util.MurmurHash | |
import java.io.PrintWriter | |
import org.apache.spark.rdd.RDD | |
val sqlContext = new org.apache.spark.sql.SQLContext(sc) | |
## Download the dataset here: http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Movies_and_TV_5.json.gz | |
val movies_ratings = sqlContext.read.json("/in/reviews_Movies_and_TV_5.json") | |
val edges=movies_ratings.select($"reviewerID",$"asin").groupBy($"reviewerID").agg(collect_set($"asin")).select(concat_ws(",",$"collect_set(asin)")).rdd.map( x => x.getString(0).split(",").map(_.trim).toList).flatMap( x => x.combinations(2).toList).map( x => (x(0),x(1))) | |
val edgesRDD: RDD[(VertexId, VertexId )] = edges.map { case (a, b) => (MurmurHash.stringHash(a).toLong, MurmurHash.stringHash(b).toLong) } | |
val graph = Graph.fromEdgeTuples(edgesRDD, 1) | |
graph.cache | |
println("<<<<<<<< Start Page Rank >>>>>>>>>") | |
println(Calendar.getInstance.getTime) | |
val ranks = graph.pageRank(0.0001).vertices | |
ranks.cache | |
println("<<<<<<<< Stop Page Rank >>>>>>>>>") | |
println("<<<<<<<< Start Rank >>>>>>>>>") | |
val asins = movies_ratings.select($"asin").rdd.distinct.map(x=>x.getString(0)).map( a => (MurmurHash.stringHash(a).toLong,a)) | |
asins.cache | |
ranks.join(asins).sortBy(_._2._1, ascending=false).take(10).foreach(x => println(x._2._2)) | |
println("<<<<<<<< Stop Rank >>>>>>>>>") | |
def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) { val p = new java.io.PrintWriter(f);try { op(p) } finally { p.close() } } | |
val top10=ranks.join(asins).sortBy(_._2._1, ascending=false).take(10).map(x => "https://www.amazon.com/dp/"+x._2._2) | |
printToFile(new File("/out/output-" + LocalDateTime.now().toString.replaceAll(":", "-"))) { p => top10.foreach(p.println)} | |
println("<<<<<<<<< Program executed >>>>>>>>>") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment