Last active
December 14, 2016 15:19
-
-
Save aray/a7de1f3801a810f8b1fa00c271a1fefd to your computer and use it in GitHub Desktop.
GraphX PageRank Initial value convergence benchmark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.reflect.ClassTag | |
import org.apache.spark.SparkContext | |
import org.apache.spark.graphx._ | |
import org.apache.spark.graphx.util.GraphGenerators | |
object Bench { | |
def runLoopWithSource(sc: SparkContext): (Array[Double], Array[Double]) = { | |
val edges = sc.parallelize((1L, 2L) :: (2L, 3L) :: (3L, 4L) :: (4L, 2L) :: Nil) | |
val g = Graph.fromEdgeTuples(edges, 1) | |
val igraphPR = Seq(0.0375000, 0.3326045, 0.3202138, 0.3096817).map(_ * 4) | |
val ranks = VertexRDD(sc.parallelize(1L to 4L zip igraphPR)) | |
val a = newVerson(ranks)(g, 100) | |
val b = oldVerson(ranks)(g, 100) | |
println("new: " + a.toList) | |
println("old: " + b.toList) | |
(a, b) | |
} | |
def runLogNormal(sc: SparkContext): (Array[Double], Array[Double]) = { | |
val g = GraphGenerators.logNormalGraph(sc, 10000, seed = 1).cache() | |
val ranks = g.pageRank(0.00001).vertices.cache() | |
val a = newVerson(ranks)(g, 100) | |
val b = oldVerson(ranks)(g, 100) | |
println("new: " + a.toList) | |
println("old: " + b.toList) | |
(a, b) | |
} | |
def compareRanks(a: VertexRDD[Double], b: VertexRDD[Double]): Double = { | |
a.leftJoin(b) { case (id, a, bOpt) => (a - bOpt.getOrElse(0.0)) * (a - bOpt.getOrElse(0.0)) } | |
.map { case (id, error) => error }.sum() | |
} | |
def newVerson[VD: ClassTag, ED: ClassTag](answer: VertexRDD[Double])( | |
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, | |
srcId: Option[VertexId] = None): Array[Double] = | |
{ | |
require(numIter > 0, s"Number of iterations must be greater than 0," + | |
s" but got ${numIter}") | |
require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" + | |
s" to [0, 1], but got ${resetProb}") | |
val personalized = srcId.isDefined | |
val src: VertexId = srcId.getOrElse(-1L) | |
// Initialize the PageRank graph with each edge attribute having | |
// weight 1/outDegree and each vertex with attribute resetProb. | |
// When running personalized pagerank, only the source vertex | |
// has an attribute resetProb. All others are set to 0. | |
var rankGraph: Graph[Double, Double] = graph | |
// Associate the degree with each vertex | |
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } | |
// Set the weight on the edges based on the degree | |
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) | |
// Set the vertex attributes to the initial pagerank values | |
.mapVertices { (id, attr) => | |
if (!(id != src && personalized)) 1.0 else 0.0 | |
} | |
def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 } | |
val res = Array.ofDim[Double](numIter) | |
var iteration = 0 | |
var prevRankGraph: Graph[Double, Double] = null | |
while (iteration < numIter) { | |
rankGraph.cache() | |
// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and | |
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation. | |
val rankUpdates = rankGraph.aggregateMessages[Double]( | |
ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src) | |
// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices | |
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the | |
// edge partitions. | |
prevRankGraph = rankGraph | |
val rPrb = if (personalized) { | |
(src: VertexId, id: VertexId) => resetProb * delta(src, id) | |
} else { | |
(src: VertexId, id: VertexId) => resetProb | |
} | |
rankGraph = rankGraph.outerJoinVertices(rankUpdates) { | |
(id, oldRank, msgSumOpt) => rPrb(src, id) + (1.0 - resetProb) * msgSumOpt.getOrElse(0.0) | |
}.cache() | |
rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices | |
// logInfo(s"PageRank finished iteration $iteration.") | |
prevRankGraph.vertices.unpersist(false) | |
prevRankGraph.edges.unpersist(false) | |
res(iteration) = compareRanks(rankGraph.vertices, answer) | |
iteration += 1 | |
} | |
res | |
} | |
def oldVerson[VD: ClassTag, ED: ClassTag](answer: VertexRDD[Double])( | |
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, | |
srcId: Option[VertexId] = None): Array[Double] = | |
{ | |
require(numIter > 0, s"Number of iterations must be greater than 0," + | |
s" but got ${numIter}") | |
require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" + | |
s" to [0, 1], but got ${resetProb}") | |
val personalized = srcId.isDefined | |
val src: VertexId = srcId.getOrElse(-1L) | |
// Initialize the PageRank graph with each edge attribute having | |
// weight 1/outDegree and each vertex with attribute resetProb. | |
// When running personalized pagerank, only the source vertex | |
// has an attribute resetProb. All others are set to 0. | |
var rankGraph: Graph[Double, Double] = graph | |
// Associate the degree with each vertex | |
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } | |
// Set the weight on the edges based on the degree | |
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) | |
// Set the vertex attributes to the initial pagerank values | |
.mapVertices { (id, attr) => | |
if (!(id != src && personalized)) resetProb else 0.0 | |
} | |
def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 } | |
val res = Array.ofDim[Double](numIter) | |
var iteration = 0 | |
var prevRankGraph: Graph[Double, Double] = null | |
while (iteration < numIter) { | |
rankGraph.cache() | |
// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and | |
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation. | |
val rankUpdates = rankGraph.aggregateMessages[Double]( | |
ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src) | |
// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices | |
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the | |
// edge partitions. | |
prevRankGraph = rankGraph | |
val rPrb = if (personalized) { | |
(src: VertexId, id: VertexId) => resetProb * delta(src, id) | |
} else { | |
(src: VertexId, id: VertexId) => resetProb | |
} | |
rankGraph = rankGraph.joinVertices(rankUpdates) { | |
(id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum | |
}.cache() | |
rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices | |
// logInfo(s"PageRank finished iteration $iteration.") | |
prevRankGraph.vertices.unpersist(false) | |
prevRankGraph.edges.unpersist(false) | |
res(iteration) = compareRanks(rankGraph.vertices, answer) | |
iteration += 1 | |
} | |
res | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment