Skip to content

Instantly share code, notes, and snippets.

@squito
Last active March 15, 2019 06:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save squito/788abf5f55a96deeea65 to your computer and use it in GitHub Desktop.
Save squito/788abf5f55a96deeea65 to your computer and use it in GitHub Desktop.
get paths of a spark-sql query
import java.lang.reflect.Method
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.sources.{HadoopFsRelation, BaseRelation}
import org.apache.spark.sql.DataFrame
def getPaths(relation: BaseRelation): Iterator[String] = {
relation match {
case hr: HadoopFsRelation =>
hr.paths.toIterator
case other: BaseRelation =>
throw new IllegalArgumentException(s"wasn't expecting relation ${other}")
}
}
def getPaths(plan: LogicalPlan): Iterator[String] = {
plan match {
// LogicalRelation will be public in the next release, but for now, need to use a reflection hack
// https://issues.apache.org/jira/browse/SPARK-7275
case lr if lr.getClass() == Class.forName("org.apache.spark.sql.execution.datasources.LogicalRelation") =>
getPaths(get("relation", lr).asInstanceOf[BaseRelation])
case other =>
plan.children.map {
getPaths
}.reduce(_ ++ _)
}
}
def getPaths(df: DataFrame): Iterator[String] = {
getPaths(df.queryExecution.analyzed)
}
// some reflection helpers. Overkill for this example, but I use them in general so putting them here
// note that you won't need reflection at all after SPARK-7275
def methods(obj: Any): Seq[String] = {
obj.getClass().getDeclaredMethods().map(_.getName()).sorted
}
def findMethod(name: String, obj: Any): Method = {
val method = obj.getClass().getDeclaredMethods().find(_.getName() == name).get
method.setAccessible(true)
method
}
def get(name: String, obj: Any): Any = {
val clz = obj.getClass()
clz.getDeclaredFields().find(_.getName() == name) match {
case Some(f) =>
f.setAccessible(true)
f.get(obj)
case None =>
val m = findMethod(name, obj)
m.invoke(obj)
}
}
// end reflection helpers
val df = sqlContext.sql("select count(1) from catalog_sales")
scala> getPaths(df).length
res4: Int = 1836
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment