Skip to content

Instantly share code, notes, and snippets.

@kbastani
Last active December 3, 2015 09:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kbastani/40a3fdf0b7bf5ee18d76 to your computer and use it in GitHub Desktop.
Save kbastani/40a3fdf0b7bf5ee18d76 to your computer and use it in GitHub Desktop.
PregelProgram Abstraction for Spark GraphX
package org.mazerunner.core.programs
import org.apache.spark.graphx.{Graph, EdgeTriplet, VertexId}
import org.mazerunner.core.abstractions.PregelProgram
/**
* @author kbastani
* The [[MaximumValueProgram]] is an example graph algorithm implemented on the [[PregelProgram]]
* abstraction.
*/
class MaximumValueProgram(@transient val graph : Graph[Int, Int])
extends PregelProgram[Int, Int, Int] with Serializable {
protected def this() = this(null)
/**
* Return the larger of the two vertex attribute values
* @param id is the [[VertexId]] that this program will perform a state operation for
* @param state is the current state of this [[VertexId]]
* @param message is the state received from another vertex in the graph
* @return an [[Int]] resulting from a comparison between current state and incoming state
*/
override def vertexProgram(id: VertexId, state: Int, message: Int): Int = {
if (message > state) {
message
} else {
state
}
}
/**
* Return the larger of the two vertex state results
* @param a A first [[Int]] representing a partial state of a vertex.
* @param b A second [[Int]] representing a different partial state of a vertex
* @return a merged [[Int]] representation from the two [[Int]] parameters
*/
override def combiner(a: Int, b: Int): Int = {
math.max(a, b)
}
/**
* If the dstVertex's value is less than the srcVertex's value, send a message to the dstVertex to update
* its state
* @param triplet An edge triplet is an object containing a pair of connected vertex objects and edge object.
* For example (v1)-[r]->(v2)
* @return The message broker returns a key value list, each containing a VertexId and a new message
*/
override def messageBroker(triplet: EdgeTriplet[Int, Int]): Iterator[(VertexId, Int)] = {
// If the srcAttr is greater than the dstAttr then notify the dstVertex to update its state
if (triplet.srcAttr > triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr))
} else {
Iterator.empty
}
}
/**
* This method wraps Spark's Pregel API entry point from the [[org.apache.spark.graphx.GraphOps]] class. This provides
* a simple way to write a suite of graph algorithms by extending the [[PregelProgram]] abstract
* class and implementing vertexProgram, messageBroker, and combiner methods.
* @param initialMsg is the initial message received for all vertices in the graph
*/
def run(initialMsg: Int): Graph[Int, Int] = {
graph.pregel(initialMsg)(this.vertexProgram, this.messageBroker, this.combiner)
}
}
package org.mazerunner.core.abstractions
import org.apache.spark.graphx._
import scala.reflect.ClassTag
/**
* @author kbastani
* The [[PregelProgram]] abstraction wraps Spark's Pregel API implementation from the [[GraphOps]]
* class into a model that is easier to write graph algorithms.
* @tparam VertexState is the generic type representing the state of a vertex
*/
abstract class PregelProgram[VertexState: ClassTag, VD: ClassTag, ED: ClassTag] protected () extends Serializable {
@transient val graph: Graph[VD, ED]
/**
* The vertex program receives a state update and acts to update its state
* @param id is the [[VertexId]] that this program will perform a state operation for
* @param state is the current state of this [[VertexId]]
* @param message is the state received from another vertex in the graph
* @return a [[VertexState]] resulting from a comparison between current state and incoming state
*/
def vertexProgram(id : VertexId, state : VertexState, message : VertexState) : VertexState
/**
* The message broker sends and receives messages. It will initially receive one message for
* each vertex in the graph.
* @param triplet An edge triplet is an object containing a pair of connected vertex objects and edge object.
* For example (v1)-[r]->(v2)
* @return The message broker returns a key value list, each containing a VertexId and a new message
*/
def messageBroker(triplet :EdgeTriplet[VertexState, ED]) : Iterator[(VertexId, VertexState)]
/**
* This method is used to reduce or combine the set of all state outcomes produced by a vertexProgram
* for each vertex in each superstep iteration. Each vertex has a list of state updates received from
* other vertices in the graph via the messageBroker method. This method is used to reduce the list
* of state updates into a single state for the next superstep iteration.
* @param a A first [[VertexState]] representing a partial state of a vertex.
* @param b A second [[VertexState]] representing a different partial state of a vertex
* @return a merged [[VertexState]] representation from the two [[VertexState]] parameters
*/
def combiner(a: VertexState, b: VertexState) : VertexState
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment