Skip to content

Instantly share code, notes, and snippets.

@manuzhang
Created June 24, 2020 07:10
Show Gist options
  • Save manuzhang/836ee3e79fda5d9569ce7ac56eec4883 to your computer and use it in GitHub Desktop.
Save manuzhang/836ee3e79fda5d9569ce7ac56eec4883 to your computer and use it in GitHub Desktop.
class AdaptiveQueryExecSuite
extends QueryTest
with SharedSparkSession
with AdaptiveSparkPlanHelper {
test("Empty stage") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true") {
withTempView("v1", "v2") {
spark.range(10).withColumn("a", 'id).createTempView("v1")
spark.range(10).withColumn("b", 'id).createTempView("v2")
withTable("t1", "t2") {
sql("CREATE TABLE t1 USING PARQUET SELECT * FROM v1")
sql("CREATE TABLE t2 USING PARQUET SELECT * FROM v2")
val testDf = sql(
"""
| SELECT b, COUNT(t1.a) as cnt
| FROM t1
| INNER JOIN t2
| ON t1.id = t2.id
| WHERE t1.id > 10
| GROUP BY b
|""".stripMargin)
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
val coalescedReaders = collect(plan) {
case r: CustomShuffleReaderExec => r
}
assert(coalescedReaders.length == 1, s"$plan")
assert(coalescedReaders.head.outputPartitioning.numPartitions == 1)
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment