Skip to content

Instantly share code, notes, and snippets.

@rdblue
Last active September 19, 2018 22:23
Show Gist options
  • Save rdblue/42cf16475b5f93b0c4ea7880af60717b to your computer and use it in GitHub Desktop.
Save rdblue/42cf16475b5f93b0c4ea7880af60717b to your computer and use it in GitHub Desktop.
DataSourceV2 SQL Analysis Rules
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import ...
/**
* Rules to convert from plans produced by the SQL parser to v2 logical plans.
*/
class DataSourceV2Analysis(spark: SparkSession) extends Rule[LogicalPlan] {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
import DataSourceV2Analysis._
// TODO: after #21978, load the correct catalog for each ident
private val catalog = spark.catalog(None).asTableCatalog
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case AlterTableAddColumnsCommand(ident, columns) =>
val identifier = fixIdent(ident)
// load the table to create a relation so that the alter table command can be validated
val table = catalog.loadTable(identifier)
val relation = DataSourceV2Relation.create(
catalog.name, identifier, table,
Map("database" -> identifier.database.get, "table" -> identifier.table))
val changes = columns.map { field =>
val (parent, name) = field.name match {
case NestedFieldName(path, fieldName) =>
(path, fieldName)
case fieldName =>
(null, fieldName)
}
TableChange.addColumn(parent, name, field.dataType)
}
AlterTable(catalog, relation, changes)
case datasources.CreateTable(catalogTable, mode, None) if isV2CatalogTable(catalogTable) =>
val options = catalogTable.storage.properties + ("provider" -> catalogTable.provider.get)
val partitioning = PartitionUtil.convertToTransforms(
catalogTable.partitionColumnNames, catalogTable.bucketSpec)
mode match {
case SaveMode.ErrorIfExists =>
CreateTable(catalog, identifier(catalogTable), catalogTable.schema, partitioning, options,
ignoreIfExists = false)
case SaveMode.Ignore =>
CreateTable(catalog, identifier(catalogTable), catalogTable.schema, partitioning, options,
ignoreIfExists = true)
case _ =>
throw new AnalysisException(s"$mode cannot be used for table creation in v2 data sources")
}
case datasources.CreateTable(catalogTable, mode, Some(query))
if isV2CatalogTable(catalogTable) =>
val options = catalogTable.storage.properties + ("provider" -> catalogTable.provider.get)
if (catalogTable.partitionColumnNames.nonEmpty) {
throw new AnalysisException("CTAS with partitions is not yet supported.")
}
mode match {
case SaveMode.Append =>
throw new AnalysisException("Append mode cannot be used with CTAS for v2 data sources")
case SaveMode.ErrorIfExists =>
CreateTableAsSelect(catalog, identifier(catalogTable), Seq.empty, query, options,
ignoreIfExists = false)
case SaveMode.Ignore =>
CreateTableAsSelect(catalog, identifier(catalogTable), Seq.empty, query, options,
ignoreIfExists = true)
case SaveMode.Overwrite =>
ReplaceTableAsSelect(catalog, identifier(catalogTable), Seq.empty, query, options)
}
case insert @ InsertIntoTable(LogicalRelation(UnresolvedCatalogRelation(catalogTable), _, _),
_, _, _, _, _) if isV2CatalogTable(catalogTable) =>
if (insert.overwrite.enabled) {
// TODO: Support overwrite
throw new AnalysisException(s"Cannot overwrite with table: $v2")
}
// TODO: Handle partition values
assert(insert.partition.isEmpty, s"Cannot write static partitions into table: $v2")
// the DataFrame API doesn't create INSERT INTO plans for v2 tables, so this must be
// SQL and should match columns by position, not by name.
AppendData.byPosition(DataSourceV2Relation.create(catalogTable), insert.child)
case LogicalRelation(UnresolvedCatalogRelation(catalogTable), _, _)
if isV2CatalogTable(catalogTable) =>
// unwrap v2 relation
DataSourceV2Relation.create(catalogTable)
}
private def fixIdent(identifier: TableIdentifier): TableIdentifier = {
identifier.database match {
case Some(_) =>
identifier
case _ =>
// TODO: this should use the catalog's current database
TableIdentifier(identifier.table, Some(spark.catalog.currentDatabase))
}
}
private def identifier(catalogTable: CatalogTable): TableIdentifier = {
fixIdent(catalogTable.identifier)
}
}
object DataSourceV2Analysis {
def apply(spark: SparkSession): DataSourceV2Analysis = new DataSourceV2Analysis(spark)
val NestedFieldName = """([\w\.]+)\.(\w+)""".r
private def isV2CatalogTable(catalogTable: CatalogTable): Boolean = {
catalogTable.provider.exists(provider =>
!"hive".equalsIgnoreCase(provider) &&
classOf[DataSourceV2].isAssignableFrom(DataSource.lookupDataSource(provider)))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment