Skip to content

Instantly share code, notes, and snippets.

@ankurdave
Created September 5, 2015 21:16
Show Gist options
  • Save ankurdave/2f06008d9f9526c207eb to your computer and use it in GitHub Desktop.
Save ankurdave/2f06008d9f9526c207eb to your computer and use it in GitHub Desktop.
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame
val conf = new SparkConf()
val sc = new SparkContext("local", "test")
val sqlContext = new SQLContext(sc)
val v = sqlContext.createDataFrame(List(
(0L, "a", 1),
(1L, "b", 2),
(2L, "c", 3),
(3L, "d", 4))).toDF("id", "attr", "number")
val e = sqlContext.createDataFrame(List(
(0L, 1L),
(1L, 2L),
(2L, 3L),
(2L, 0L))).toDF("src_id", "dst_id")
val g = GraphFrame(v, e)
val vertices = g.find("(a)")
vertices.show
val triplets = g.find("(u)-[]->(v)")
triplets.show
triplets.explain(true)
val triangles = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(a)").select("a_id", "b_id", "c_id")
triangles.show
triangles.explain(true)
def countJoins(df: DataFrame) = {
import org.apache.spark.sql.catalyst.plans.logical.Join
val unopt = df.queryExecution.analyzed.collect {
case j: Join => j
}.size
val opt = df.queryExecution.optimizedPlan.collect {
case j: Join => j
}.size
println(s"Optimization reduced number of joins from $unopt to $opt")
}
countJoins(triangles)
val fof = g.find("(u)-[]->(v); (v)-[]->(w)").select("u_id", "v_id", "w_id")
val fof = g.find("(u)-[]->(v); (v)-[]->(w); !(u)-[]->(w); !(w)-[]->(u)").select("u_id", "v_id", "w_id")
fof.show
fof.explain(true)
countJoins(fof)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment