Skip to content

Instantly share code, notes, and snippets.

@ankurdave
Last active August 29, 2015 14:10
Show Gist options
  • Save ankurdave/cb89391101e4e87497ae to your computer and use it in GitHub Desktop.
Save ankurdave/cb89391101e4e87497ae to your computer and use it in GitHub Desktop.
Interface between GraphX and SampleClean
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import scala.reflect.ClassTag
// Take edges and build a graph
def a(vertices: RDD[(Long, Row)], edges: RDD[(Long, Long)]): Graph[Row, Unit] =
Graph(vertices, edges.map(pair => Edge(pair._1, pair._2, Unit)))
// Run connected components on the graph
def b(graph: Graph[Row, Unit], compare: (Row, Row) => Int): RDD[(Long, Row)] =
CustomConnectedComponents.run(graph, compare)
// Add edges to the graph
// newEdges must refer to existing vertices
def c(graph: Graph[Row, Unit], newEdges: RDD[(Long, Long)]): Graph[Row, Unit] = {
// TODO: Incrementally update the routing tables
Graph(graph.vertices, graph.edges.union(newEdges.map(pair => Edge(pair._1, pair._2, Unit))))
}
/** Run connected components with a custom component comparison function. */
object CustomConnectedComponents {
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], compare: (VD, VD) => Int): VertexRDD[VD] = {
def sendMessage(edge: EdgeTriplet[VD, ED]) = {
if (compare(edge.srcAttr, edge.dstAttr) < 0) {
Iterator((edge.dstId, edge.srcAttr))
} else if (compare(edge.srcAttr, edge.dstAttr) > 0) {
Iterator((edge.srcId, edge.dstAttr))
} else {
Iterator.empty
}
}
def min(a: VD, b: VD): VD = if (compare(a, b) < 0) a else b
Pregel[VD, ED, VD](graph, null.asInstanceOf[VD], activeDirection = EdgeDirection.Either)(
vprog = (id, attr, msg) => if (msg == null) attr else min(attr, msg),
sendMsg = sendMessage,
mergeMsg = min).vertices
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment