Skip to content

Instantly share code, notes, and snippets.

@michaelmalak
Last active March 6, 2019 15:07
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save michaelmalak/e5e2d382dbde8a536175b4128419c6d1 to your computer and use it in GitHub Desktop.
Save michaelmalak/e5e2d382dbde8a536175b4128419c6d1 to your computer and use it in GitHub Desktop.
// Code for Michael Malak's Spark Summit 2016 presentation
// "Finding Graph Isomorphisms in GraphX and GraphFrames
// Additional code can be downloaded from
// https://www.manning.com/books/spark-graphx-in-action
// * readRdf() for GraphX
// * EXAMPLE: Missing <exports> for Canada (using GraphX SVD++)
// * EXAMPLE: Missing <influences> for well-known philosophers (using GraphFrames)
// Shell command to launch Spark Shell with GraphFrames jar
// ~/Downloads/spark-1.6.0-bin-hadoop2.3/bin/spark-shell --driver-memory 13g --driver-cores 4 --jars ~/graphframes/target/scala-2.10/graphframes_2.10-0.0.1-SNAPSHOT.jar
// readRdfDf (for GraphFrames)
//============================
import org.graphframes._
import org.apache.spark.sql.Row
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
def readRdfDf(sc:org.apache.spark.SparkContext, filename:String) = {
val r = sc.textFile(filename).map(_.split("\t"))
val v = r.map(_(1)).union(r.map(_(3))).distinct.zipWithIndex.map(
x => Row(x._2,x._1)).cache
val stv = StructType(StructField("id",LongType) ::
StructField("attr",StringType) :: Nil)
val vdf = sqlContext.createDataFrame(v,stv).cache
val str = StructType(StructField("rdfId",StringType) ::
StructField("subject",StringType) ::
StructField("predicate",StringType) ::
StructField("object",StringType) :: Nil)
val edf = sqlContext.createDataFrame(r.map(Row.fromSeq(_)),str)
.join(vdf, $"subject" === $"attr")
.selectExpr("id AS src", "predicate", "object")
.join(vdf, $"object" === $"attr")
.selectExpr("src", "id AS dst", "predicate AS attr")
v.unpersist(false)
GraphFrame(vdf,edf)
}
// mergeGraphs (for GraphFrames)
//==============================
def mergeGraphs(g1:GraphFrame, g2:GraphFrame) = {
val vunion = g1.vertices.sqlContext.createDataFrame(
g1.vertices.select($"attr").unionAll(g2.vertices.select($"attr")).distinct.rdd.map(_(0)).zipWithIndex.map(t => Row.fromTuple(t.swap)),
new StructType(Array(StructField("id",LongType),StructField("attr",StringType)))).cache
def edgesWithNewVertexIds(gf:GraphFrame) = {
gf.triplets
.join(vunion, $"src.attr" === $"attr")
.selectExpr("id AS src","edge.attr AS edgeattr","dst.attr AS dstattr")
.join(vunion, $"dstattr" === $"attr")
.selectExpr("src","id AS dst","edgeattr AS attr")
}
GraphFrame(vunion, edgesWithNewVertexIds(g1).unionAll(edgesWithNewVertexIds(g2)))
}
// Read YAGO
//==========
var yago = readRdfDf(sc, "s3n://yago3/yagoFacts.tsv")
def mergeIntoYago(filename:String):Unit = {
val r = readRdfDf(sc, filename)
r.vertices.cache
r.edges.cache
val g = mergeGraphs(yago, r)
val g2 = GraphFrame(g.vertices, g.edges.distinct)
g2.vertices.repartition(1000)
g2.vertices.sqlContext.setConf("spark.sql.shuffle.partitions", "1000")
g2.vertices.cache
g2.vertices.count
g2.edges.repartition(1000)
g2.edges.sqlContext.setConf("spark.sql.shuffle.partitions", "1000")
g2.edges.cache
g2.edges.count
yago.vertices.unpersist
yago.edges.unpersist
r.vertices.unpersist
r.edges.unpersist
yago = g2
}
mergeIntoYago("s3n://yago3/yagoLabels.tsv")
mergeIntoYago("s3n://yago3/yagoTaxonomy.tsv")
// removeSingletons()
// used by filterEdges()
//======================
def removeSingletons(g:GraphFrame) =
GraphFrame(g.vertices.sqlContext.createDataFrame(
g.triplets.select("src").map(_.getStruct(0))
.union(g.triplets.select("dst").map(_.getStruct(0)))
.distinct,
g.vertices.schema),
g.edges)
// filterEdges()
// for prefiltering prior to running GraphFrames query
//====================================================
def filterEdges(g:GraphFrame, condition:Column) =
removeSingletons(GraphFrame(g.vertices, g.edges.where(condition)))
// EXAMPLE: General 3-vertex rule mining
//======================================
yago.find("()-[e1]->(v2); (v2)-[e2]->(v3)")
.distinct
.selectExpr("e1.attr AS e1attr", "e2.attr AS e2attr", "v3.id AS v3id", "v3.attr AS v3attr")
.groupBy("e1attr", "e2attr", "v3id", "v3attr")
.count
.join(yago.edges.groupBy("attr")
.count
.selectExpr("attr", "count AS e1count"),
$"e1attr" === $"attr")
.selectExpr("e1attr", "e2attr", "v3attr", "count/e1count AS ratio")
.orderBy($"ratio".desc)
.show(40)
// EXAMPLE: General 4-vertex rule mining
// (incomplete, ran out of memory)
//======================================
val y1 = yago.find("(v1)-[e3]->(v4); (v1)-[e1]->(v2); (v2)-[e2]->(v3)")
.distinct
.selectExpr("e3.attr AS e3attr", "e1.attr AS e1attr", "e2.attr AS e2attr", "v3.id AS v3id", "v3.attr AS v3attr")
.groupBy("e3attr", "e1attr", "e2attr", "v3id", "v3attr")
.count
.cache
// EXAMPLE: Find missing <isMarriedTo>
// (first attempt, results in a lot of false positives due to name aliases)
val df = y2.find("(parent1)-[e1]->(child); (parent2)-[e2]->(child); !(parent1)-[]->(parent2)")
.filter(($"parent1" !== $"parent2") && ($"e1.attr" === "<hasChild>") && ($"e2.attr" === "<hasChild>"))
.selectExpr("parent1.attr AS parent1", "parent2.attr AS parent2", "child.attr AS child")
.cache
df.show
// EXAMPLE: Find missing <isMarriedTo>
// (better, finds the Osbournes)
val df = y2.find("(parent1)-[e1]->(child); (parent2)-[e2]->(child); (parent1)-[]->(p1gender); (parent2)-[]->(p2gender)")
.filter(($"p1gender.attr" === "<male>") && ($"p2gender.attr" === "<female>") && ($"e1.attr" === "<hasChild>") && ($"e2.attr" === "<hasChild>"))
.selectExpr("parent1.attr AS parent1", "parent2.attr AS parent2", "child.attr AS child")
.cache
df.show
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment