Skip to content

Instantly share code, notes, and snippets.

@ceteri
Last active September 9, 2021 13:38
  • Star 12 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save ceteri/c2a692b5161b23d92ed1 to your computer and use it in GitHub Desktop.
Spark GraphX demo
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
case class Peep(name: String, age: Int)
val vertexArray = Array(
(1L, Peep("Kim", 23)),
(2L, Peep("Pat", 31)),
(3L, Peep("Chris", 52)),
(4L, Peep("Kelly", 39)),
(5L, Peep("Leslie", 45))
)
val edgeArray = Array(
Edge(2L, 1L, 7),
Edge(2L, 4L, 2),
Edge(3L, 2L, 4),
Edge(3L, 5L, 3),
Edge(4L, 1L, 1),
Edge(5L, 3L, 9)
)
val vertexRDD: RDD[(Long, Peep)] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val g: Graph[Peep, Int] = Graph(vertexRDD, edgeRDD)
val results = g.triplets.filter(t => t.attr > 7)
for (triplet <- results.collect) {
println(s"${triplet.srcAttr.name} loves ${triplet.dstAttr.name}")
}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val word = sqlContext.parquetFile("word.parquet")
word.registerTempTable("word")
val edge = sqlContext.parquetFile("edge.parquet")
edge.registerTempTable("edge")
sql("SELECT * FROM word").take(5)
sql("SELECT * FROM edge").take(5)
val n = sql("SELECT id, stem FROM word").distinct()
val nodes: RDD[(Long, String)] = n.map(p => (p(0).asInstanceOf[Long], p(1).asInstanceOf[String]))
val e = sql("SELECT * FROM edge")
val edges: RDD[Edge[Int]] = e.map(p => Edge(p(0).asInstanceOf[Long], p(1).asInstanceOf[Long], 0))
val g: Graph[String, Int] = Graph(nodes, edges)
val ranks = g.pageRank(0.0001).vertices
ranks.join(nodes).sortBy(_._2._1, ascending=false).foreach(println)
// SSSP impl in Graphx using Pregel
// https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm
// http://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api
// http://stackoverflow.com/questions/23700124/how-to-get-sssp-actual-path-by-apache-spark-graphx
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
val graph = GraphGenerators.logNormalGraph(sc, numVertices = 5, numEParts = sc.defaultParallelism, mu = 4.0, sigma = 1.3).mapEdges(e => e.attr.toDouble)
graph.edges.foreach(println)
// initialize all vertices except the root to have distance infinity
val sourceId: VertexId = 0
val initialGraph : Graph[(Double, List[VertexId]), Double] = graph.mapVertices((id, _) => if (id == sourceId) (0.0, List[VertexId](sourceId)) else (Double.PositiveInfinity, List[VertexId]()))
val sssp = initialGraph.pregel((Double.PositiveInfinity, List[VertexId]()), Int.MaxValue, EdgeDirection.Out)(
// vertex program
(id, dist, newDist) => if (dist._1 < newDist._1) dist else newDist,
// send message
triplet => {
if (triplet.srcAttr._1 < triplet.dstAttr._1 - triplet.attr ) {
Iterator((triplet.dstId, (triplet.srcAttr._1 + triplet.attr , triplet.srcAttr._2 :+ triplet.dstId)))
} else {
Iterator.empty
}
},
// merge message
(a, b) => if (a._1 < b._1) a else b)
println(sssp.vertices.collect.mkString("\n")
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment