Skip to content

Instantly share code, notes, and snippets.

Last active December 14, 2016 15:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aray/a7de1f3801a810f8b1fa00c271a1fefd to your computer and use it in GitHub Desktop.
Save aray/a7de1f3801a810f8b1fa00c271a1fefd to your computer and use it in GitHub Desktop.
GraphX PageRank Initial value convergence benchmark
import scala.reflect.ClassTag
import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
object Bench {
def runLoopWithSource(sc: SparkContext): (Array[Double], Array[Double]) = {
val edges = sc.parallelize((1L, 2L) :: (2L, 3L) :: (3L, 4L) :: (4L, 2L) :: Nil)
val g = Graph.fromEdgeTuples(edges, 1)
val igraphPR = Seq(0.0375000, 0.3326045, 0.3202138, 0.3096817).map(_ * 4)
val ranks = VertexRDD(sc.parallelize(1L to 4L zip igraphPR))
val a = newVerson(ranks)(g, 100)
val b = oldVerson(ranks)(g, 100)
println("new: " + a.toList)
println("old: " + b.toList)
(a, b)
def runLogNormal(sc: SparkContext): (Array[Double], Array[Double]) = {
val g = GraphGenerators.logNormalGraph(sc, 10000, seed = 1).cache()
val ranks = g.pageRank(0.00001).vertices.cache()
val a = newVerson(ranks)(g, 100)
val b = oldVerson(ranks)(g, 100)
println("new: " + a.toList)
println("old: " + b.toList)
(a, b)
def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = {
a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) }
.map { case (id, error) => error }.sum()
def newVerson[VD: ClassTag, ED: ClassTag](answer: VertexRDD[Double])(
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Array[Double] =
require(numIter > 0, s"Number of iterations must be greater than 0," +
s" but got ${numIter}")
require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" +
s" to [0, 1], but got ${resetProb}")
val personalized = srcId.isDefined
val src: VertexId = srcId.getOrElse(-1L)
// Initialize the PageRank graph with each edge attribute having
// weight 1/outDegree and each vertex with attribute resetProb.
// When running personalized pagerank, only the source vertex
// has an attribute resetProb. All others are set to 0.
var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
// Set the vertex attributes to the initial pagerank values
.mapVertices { (id, attr) =>
if (!(id != src && personalized)) 1.0 else 0.0
def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
val res = Array.ofDim[Double](numIter)
var iteration = 0
var prevRankGraph: Graph[Double, Double] = null
while (iteration < numIter) {
// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
val rankUpdates = rankGraph.aggregateMessages[Double](
ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src)
// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
// edge partitions.
prevRankGraph = rankGraph
val rPrb = if (personalized) {
(src: VertexId, id: VertexId) => resetProb * delta(src, id)
} else {
(src: VertexId, id: VertexId) => resetProb
rankGraph = rankGraph.outerJoinVertices(rankUpdates) {
(id, oldRank, msgSumOpt) => rPrb(src, id) + (1.0 - resetProb) * msgSumOpt.getOrElse(0.0)
rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
// logInfo(s"PageRank finished iteration $iteration.")
res(iteration) = compareRanks(rankGraph.vertices, answer)
iteration += 1
def oldVerson[VD: ClassTag, ED: ClassTag](answer: VertexRDD[Double])(
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Array[Double] =
require(numIter > 0, s"Number of iterations must be greater than 0," +
s" but got ${numIter}")
require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" +
s" to [0, 1], but got ${resetProb}")
val personalized = srcId.isDefined
val src: VertexId = srcId.getOrElse(-1L)
// Initialize the PageRank graph with each edge attribute having
// weight 1/outDegree and each vertex with attribute resetProb.
// When running personalized pagerank, only the source vertex
// has an attribute resetProb. All others are set to 0.
var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
// Set the vertex attributes to the initial pagerank values
.mapVertices { (id, attr) =>
if (!(id != src && personalized)) resetProb else 0.0
def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
val res = Array.ofDim[Double](numIter)
var iteration = 0
var prevRankGraph: Graph[Double, Double] = null
while (iteration < numIter) {
// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
val rankUpdates = rankGraph.aggregateMessages[Double](
ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src)
// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
// edge partitions.
prevRankGraph = rankGraph
val rPrb = if (personalized) {
(src: VertexId, id: VertexId) => resetProb * delta(src, id)
} else {
(src: VertexId, id: VertexId) => resetProb
rankGraph = rankGraph.joinVertices(rankUpdates) {
(id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum
rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
// logInfo(s"PageRank finished iteration $iteration.")
res(iteration) = compareRanks(rankGraph.vertices, answer)
iteration += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment