Skip to content

Instantly share code, notes, and snippets.

@deanwampler
Created March 4, 2016 13:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save deanwampler/134086d753883acb117b to your computer and use it in GitHub Desktop.
Save deanwampler/134086d753883acb117b to your computer and use it in GitHub Desktop.
Databrick's Python example for the new GraphFrame API ported to Scala and Spark Notebook
{
"metadata" : {
"name" : "GraphFramesExample",
"user_save_timestamp" : "1970-01-01T01:00:00.000Z",
"auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
"language_info" : {
"name" : "scala",
"file_extension" : "scala",
"codemirror_mode" : "text/x-scala"
},
"trusted" : true,
"customLocalRepo" : null,
"customRepos" : null,
"customDeps" : null,
"customImports" : null,
"customArgs" : null,
"customSparkConf" : null
},
"cells" : [ {
"metadata" : { },
"cell_type" : "markdown",
"source" : "# GraphFrames Example\n\n> **Note** This notebook assumes Spark 1.6.0. The pre-built jars for `GraphFrame` use Scala 2.10.X for versions before Spark 2.0. Hence, you must use a Spark Notebook built for Scala 2.10 with this notebook, unless you use a custom build of the `GraphFrames` jar file or a Spark 2.0 build of Spark Notebook, etc.\n\nThis is a port of the [Databricks example notebook](http://go.databricks.com/hubfs/notebooks/3-GraphFrames-User-Guide-python.html) from Python to Scala and to [Spark Notebook](http://spark-notebook.io). (It leaves out some of the comment cells and adds a few more example queries, etc.)\n\nBefore starting, download the `GraphFrames` jar from [here](http://spark-packages.org/package/graphframes/graphframes). Here, we assume it is in `/tmp`. If you put it somewhere else, change the path in the next cell.\n\nFor more information:\n* [GraphFrames home page](http://graphframes.github.io/)\n* [User Guide](http://graphframes.github.io/user-guide.html)\n* [introductory blog post](https://databricks.com/blog/2016/03/03/introducing-graphframes.html)\n* [ScalaDocs](http://graphframes.github.io/api/scala/index.html#org.graphframes.package)"
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false,
"presentation" : {
"tabs_state" : "{\n \"tab_id\": \"#tab2145781660-0\"\n}",
"pivot_chart_state" : "{\n \"hiddenAttributes\": [],\n \"menuLimit\": 200,\n \"cols\": [],\n \"rows\": [],\n \"vals\": [],\n \"exclusions\": {},\n \"inclusions\": {},\n \"unusedAttrsVertical\": 85,\n \"autoSortUnusedAttrs\": false,\n \"inclusionsInfo\": {},\n \"aggregatorName\": \"Count\",\n \"rendererName\": \"Table\"\n}"
}
},
"cell_type" : "code",
"source" : ":cp /tmp/graphframes-0.1.0-spark1.6.jar",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "import org.graphframes._",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "import org.apache.spark.sql.SQLContext\nval sqlContext = new SQLContext(sparkContext)\nimport sqlContext.implicits._ \nimport org.apache.spark.sql.functions._ // for min, max, etc. column operations\nimport sqlContext.sql // Makes it easier to write SQL queries.",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "A function to dump a `DataFrame`. Due to annoying bugs in Spark Notebook's `DataFrameWidget`, it sometimes fails to show any output!"
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val vertices = sqlContext.createDataFrame(Seq(\n (\"a\", \"Alice\", 34),\n (\"b\", \"Bob\", 36),\n (\"c\", \"Charlie\", 30),\n (\"d\", \"David\", 29),\n (\"e\", \"Esther\", 32),\n (\"f\", \"Fanny\", 36),\n (\"g\", \"Gabby\", 60))).toDF(\"id\", \"name\", \"age\")",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val edges = sqlContext.createDataFrame(Seq(\n (\"a\", \"b\", \"friend\"),\n (\"b\", \"c\", \"follow\"),\n (\"c\", \"b\", \"follow\"),\n (\"f\", \"c\", \"follow\"),\n (\"e\", \"f\", \"follow\"),\n (\"e\", \"d\", \"friend\"),\n (\"d\", \"a\", \"friend\"),\n (\"a\", \"e\", \"friend\"))).toDF(\"src\", \"dst\", \"relationship\")",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val g = GraphFrame(vertices, edges)",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "g",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "## Basic graph and DataFrame queries\n\n`GraphFrames` provide several simple graph queries, such as node degree.\n\nAlso, since `GraphFrames` represent graphs as pairs of vertex and edge `DataFrames`, it is easy to make powerful queries directly on the vertex and edge `DataFrames`. Those `DataFrames` are made available as vertices and edges fields in the `GraphFrame`.\n"
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(g.vertices)",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(g.edges)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "The incoming degrees of the vertices"
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(g.inDegrees)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "Outgoing degrees of the vertices."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(g.outDegrees)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "The Degree of the vertices."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(g.degrees)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "Now run some queries over the vertices"
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "g.vertices.groupBy().min(\"age\")",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val numberOfFollows = g.edges.filter(\"relationship = 'follow'\")\nnumberOfFollows.count()\ndisplay(numberOfFollows)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "## Motif finding\n\nMore complex relationships involving edges and vertices can be build using motifs. The following cell finds the pairs of vertices with edges in both directions between them. The result is a `DataFrame` in which the column names are given by the motif keys.\n\nSee the [User Guide](http://graphframes.github.io/user-guide.html) for more details on the API."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "// Search for pairs of vertices with edges in both directions between them.\nval bidirectional = g.find(\"(a)-[e]->(b); (b)-[e2]->(a)\")\ndisplay(bidirectional)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "Filter the results"
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val bidirectional30 = bidirectional.filter(\"b.age > 30 or a.age > 30\")\ndisplay(bidirectional30)",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "// DSL\nval bidirectional30b = bidirectional.filter($\"b.age\" > 30 || $\"a.age\" > 30)\ndisplay(bidirectional30b)",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : true
},
"cell_type" : "markdown",
"source" : "From the blog post, to recommend whom to follow, we might search for triplets of users A,B,C where A follows B and B follows C, but A does not follow C."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val toRecommend1 = g.find(\"(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)\")\n// Filter out loops (with DataFrame operation)\nval toRecommend2 = toRecommend1.filter(\"A.id != C.id\")\n// Less \"stringly typed\":\nval toRecommend2b = toRecommend1.filter($\"A.id\" !== $\"C.id\")\n// Select recommendations for A to follow C\nval toRecommend = toRecommend2.select(\"A\", \"C\")",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(toRecommend)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "## Subgraphs\nSubgraphs are built by filtering a subset of edges and vertices. For example, the following subgraph only contains people who are friends and who are more than 30 years old."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val paths = g.find(\"(a)-[e]->(b)\")\n .filter(\"e.relationship = 'follow'\")\n .filter(\"a.age < b.age\")\n// The `paths` variable contains the vertex information, which we can extract:\nval e2 = paths.select(\"e.src\", \"e.dst\", \"e.relationship\")\n\n// In Spark 1.5+, the user may simplify the previous call to:\nval e2b = paths.select(\"e.*\")\n\n// Construct the subgraph\nval g2 = GraphFrame(g.vertices, e2)",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(paths)",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(e2)",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(g2.vertices)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "By construction, this is the same as the original vertices, but the edges are now different:"
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(g.edges)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "## Standard graph algorithms\n`GraphFrames` comes with a number of standard graph algorithms built in:\n* Breadth-first search (BFS)\n* Connected components\n* Strongly connected components\n* Label Propagation Algorithm (LPA)\n* PageRank\n* Shortest paths\n* Triangle count"
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "### Breadth-first search (BFS)\nSearch from \"Esther\" for users with age < 32.\n\n**Note:** It appears from the Python-based blog post and comments in the ScalaDocs, that the original form of the `GraphFrame#bfs` method took two arguments, one for the _from expression_ and one for the _to expression_."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val bfs = g.bfs\n .fromExpr($\"name\" === \"Esther\")\n .toExpr($\"age\" < 32).run()\ndisplay(bfs)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "The search may also be limited by edge filters and maximum path lengths."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val bfs2 = g.bfs\n .fromExpr($\"name\" === \"Esther\")\n .toExpr($\"age\" < 32)\n .edgeFilter($\"relationship\" !== \"friend\")\n .maxPathLength(3).run()\ndisplay(bfs2)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "### Connected components\nCompute the connected component membership of each vertex and return a graph with each vertex assigned a component ID. Contrast with _strongly connected components_ (next)."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val cc = g.connectedComponents.run()\ndisplay(cc)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "### Strongly connected components\nCompute the strongly connected component (SCC) of each vertex and return a graph with each vertex assigned to the SCC containing that vertex. Membership in a component requires that all vertices can reach all other vertices in the component. This isn't a requirement for _connected components_."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val scc = g.stronglyConnectedComponents.maxIter(10).run()\ndisplay(scc)\ndisplay(scc.select(\"id\", \"component\"))",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "### Label Propagation\nRun the static _Label Propagation Algorithm_ for detecting communities in networks.\n\nEach node in the network is initially assigned to its own community. At every superstep, nodes send their community affiliation to all neighbors and update their state to the mode community affiliation of incoming messages.\n\nLPA is a standard community detection algorithm for graphs. It is very inexpensive computationally, although (1) convergence is not guaranteed and (2) one can end up with trivial solutions (all nodes are identified into a single community)."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val lp = g.labelPropagation.maxIter(5).run()\ndisplay(lp)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "### PageRank\nIdentify important vertices in a graph based on connections."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : true
},
"cell_type" : "code",
"source" : "val pr = g.pageRank.resetProbability(0.15).tol(0.01).run()",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(pr.vertices)",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(pr.edges)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "Limit iterations."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "g.pageRank.resetProbability(0.15).maxIter(10)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "Run PageRank personalized for vertex \"a\""
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val pra = g.pageRank.resetProbability(0.15).maxIter(10).sourceId(\"a\").run()",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(pra.vertices)",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "display(pra.edges)",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "### Shortest paths\nComputes shortest paths to the given set of landmark vertices, where landmarks are specified by vertex ID."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val sp = g.shortestPaths.landmarks(Seq(\"a\", \"d\")).run()\ndisplay(sp)",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "sp foreach println",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "### Triangle count\nComputes the number of triangles passing through each vertex."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val triangles = g.triangleCount.run()\ndisplay(triangles)",
"outputs" : [ ]
} ],
"nbformat" : 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment