Created
April 27, 2019 08:49
-
-
Save kk17/60f956379398b0792059498543566e2a to your computer and use it in GitHub Desktop.
Pitfalls in Using Spark Catalog API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.sql.catalog | |
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils => SparkCatalogUtils} | |
import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation} | |
import org.apache.spark.sql.sources.BaseRelation | |
import org.apache.spark.sql.types.{StructField, StructType} | |
import org.apache.spark.sql.{DataFrame, SparkSession} | |
/** | |
* Util class for creating an external hive metastore table with user specified schema path. | |
* We need this class because in Spark 2.x catalog API when user create an external table with | |
* specified schema, the schema will actually not be used and the table will fail to be | |
* recognized as a partitioned table. Related Spark code can be found at: | |
* <br/> | |
* <a href="https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala#L338">CatalogImpl.scala</a> | |
* <br/> | |
* <a href="https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala#L80">createDataSourceTables.scala</a> | |
* | |
* | |
*/ | |
object CatalogUtils { | |
lazy val sparkSession = SparkSession.getActiveSession.get | |
def createTable( | |
tableName: String, | |
path: String, | |
schemaPath: String): Unit = { | |
val schema = if (schemaPath == null || schemaPath.isEmpty) { | |
new StructType() | |
} else { | |
sparkSession.read.parquet(schemaPath).schema | |
} | |
createTable(tableName, null, path, schema, Map.empty, false) | |
} | |
def createTable( | |
tableName: String, | |
path: String, | |
mergeSchema: Boolean = false): Unit = { | |
val schema = if (!mergeSchema) { | |
new StructType() | |
} else { | |
sparkSession.read.option("mergeSchema", "true").parquet(path).schema | |
} | |
createTable(tableName, null, path, schema, Map.empty, true) | |
} | |
def createTable( | |
tableName: String, | |
source: String, | |
path: String, | |
schema: StructType, | |
options: Map[String, String], | |
partitionColumnsInSchema: Boolean | |
): Unit = { | |
val newSource = if (source == null || source.isEmpty) { | |
sparkSession.conf.get("spark.sql.sources.default") | |
} else { | |
source | |
} | |
val newOptions: Map[String, String] = if (path != null && !path.isEmpty) { | |
options + ("path" -> path) | |
} else { | |
options | |
} | |
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) | |
val storage = DataSource.buildStorageFormatFromOptions(newOptions) | |
val tableType = if (storage.locationUri.isDefined) { | |
CatalogTableType.EXTERNAL | |
} else { | |
CatalogTableType.MANAGED | |
} | |
val table = CatalogTable( | |
identifier = tableIdent, | |
tableType = tableType, | |
storage = storage, | |
schema = schema, | |
provider = Some(newSource) | |
) | |
val sessionState = sparkSession.sessionState | |
if (sessionState.catalog.tableExists(table.identifier)) { | |
throw new TableExistException(s"Table ${table.identifier.unquotedString} already exists.") | |
} | |
// Create the relation to validate the arguments before writing the metadata to the metastore, | |
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE. | |
val pathOption = table.storage.locationUri.map("path" -> SparkCatalogUtils.URIToString(_)) | |
// Fill in some default table options from the session conf | |
val tableWithDefaultOptions = table.copy( | |
identifier = table.identifier.copy( | |
database = Some( | |
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))), | |
tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions) | |
val dataSource: BaseRelation = | |
DataSource( | |
sparkSession = sparkSession, | |
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), | |
partitionColumns = table.partitionColumnNames, | |
className = table.provider.get, | |
bucketSpec = table.bucketSpec, | |
options = table.storage.properties ++ pathOption, | |
// As discussed in SPARK-19583, we don't check if the location is existed | |
catalogTable = Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false) | |
val partitionColumns: Array[StructField] = { | |
assert(table.partitionColumnNames.isEmpty) | |
dataSource match { | |
case r: HadoopFsRelation => r.partitionSchema.fields | |
case _ => Array.empty | |
} | |
} | |
val partitionColumnNames = partitionColumns.map(_.name) | |
val newSchema: StructType = if (table.schema.nonEmpty) { | |
if (partitionColumnsInSchema) { | |
table.schema | |
} else { | |
table.schema.fields.map(_.name).intersect(partitionColumnNames) match { | |
case Array() => StructType(table.schema.fields ++ partitionColumns) | |
case arr => { | |
val message = "Partition column names: " + | |
s"[${arr.mkString(",")}] cannot exist in user specified schema.\n" + | |
s" Inferred partition columns: [${partitionColumnNames.mkString(",")}].\n" + | |
s" User specified schema:\n${table.schema.treeString}" | |
throw new ConflictedSchemaException(message) | |
} | |
} | |
} | |
} else { | |
dataSource.schema | |
} | |
val newTable = | |
table.copy( | |
schema = newSchema, | |
partitionColumnNames = partitionColumnNames, | |
// If metastore partition management for file source tables is enabled, we start off with | |
// partition provider hive, but no partitions in the metastore. The user has to call | |
// `msck repair table` to populate the table partitions. | |
tracksPartitionsInCatalog = partitionColumnNames.nonEmpty && | |
sessionState.conf.manageFilesourcePartitions) | |
sessionState.catalog.createTable(newTable, ignoreIfExists = false) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment