Skip to content

Instantly share code, notes, and snippets.

@EronWright
Created July 2, 2015 14:54
Show Gist options
  • Save EronWright/cb5fb9af46fd810194f8 to your computer and use it in GitHub Desktop.
Save EronWright/cb5fb9af46fd810194f8 to your computer and use it in GitHub Desktop.
Demonstration of limitation on column pruning for transformed dataframes
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.sources.{PrunedScan, BaseRelation}
import scala.language.existentials
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
case class SimplePrunedScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
extends BaseRelation
with PrunedScan {
override def schema: StructType =
StructType(
StructField("label", StringType, nullable = false) ::
StructField("features", StringType, nullable = false) :: Nil)
override def buildScan(requiredColumns: Array[String]): RDD[Row] = {
val rowBuilders = requiredColumns.map {
case "label" => (i: Int) => Seq(s"label_$i")
case "features" => (i: Int) => throw new RuntimeException("unexpected: scan on features column")
}
sqlContext.sparkContext.parallelize(from to to).map(i =>
Row.fromSeq(rowBuilders.map(_(i)).reduceOption(_ ++ _).getOrElse(Seq.empty)))
}
}
object Main {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[*]")
val sparkContext = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
import sqlContext.implicits._
val relation = new SimplePrunedScan(1, 10)(sqlContext)
var dataset = sqlContext.baseRelationToDataFrame(relation)
val indexer = new StringIndexer()
.setInputCol("label").setOutputCol("labelIndex")
println("dataset (raw)")
indexer.fit(dataset)
println("dataset (sampled)")
val dataset2 = dataset.sample(false, 0.6, 11L)
indexer.fit(dataset2)
}
}
@EronWright
Copy link
Author

This gist explores a limitation of Spark SQL. A relation supporting PrunedScan is defined with two columns, label and features. A StringIndexer is defined, operating only on the label column. Ideally, the features column is never scanned. The gist demonstrates that it is scanned in certain cases, such as when the dataframe is transformed with sample.

The program will fail at line 51.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment