Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Pitfalls in Using Spark Catalog API
package org.apache.spark.sql.catalog
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils => SparkCatalogUtils}
import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Util class for creating an external hive metastore table with user specified schema path.
* We need this class because in Spark 2.x catalog API when user create an external table with
* specified schema, the schema will actually not be used and the table will fail to be
* recognized as a partitioned table. Related Spark code can be found at:
* <br/>
* <a href="https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala#L338">CatalogImpl.scala</a>
* <br/>
* <a href="https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala#L80">createDataSourceTables.scala</a>
*
*
*/
object CatalogUtils {
lazy val sparkSession = SparkSession.getActiveSession.get
def createTable(
tableName: String,
path: String,
schemaPath: String): Unit = {
val schema = if (schemaPath == null || schemaPath.isEmpty) {
new StructType()
} else {
sparkSession.read.parquet(schemaPath).schema
}
createTable(tableName, null, path, schema, Map.empty, false)
}
def createTable(
tableName: String,
path: String,
mergeSchema: Boolean = false): Unit = {
val schema = if (!mergeSchema) {
new StructType()
} else {
sparkSession.read.option("mergeSchema", "true").parquet(path).schema
}
createTable(tableName, null, path, schema, Map.empty, true)
}
def createTable(
tableName: String,
source: String,
path: String,
schema: StructType,
options: Map[String, String],
partitionColumnsInSchema: Boolean
): Unit = {
val newSource = if (source == null || source.isEmpty) {
sparkSession.conf.get("spark.sql.sources.default")
} else {
source
}
val newOptions: Map[String, String] = if (path != null && !path.isEmpty) {
options + ("path" -> path)
} else {
options
}
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
val storage = DataSource.buildStorageFormatFromOptions(newOptions)
val tableType = if (storage.locationUri.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}
val table = CatalogTable(
identifier = tableIdent,
tableType = tableType,
storage = storage,
schema = schema,
provider = Some(newSource)
)
val sessionState = sparkSession.sessionState
if (sessionState.catalog.tableExists(table.identifier)) {
throw new TableExistException(s"Table ${table.identifier.unquotedString} already exists.")
}
// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val pathOption = table.storage.locationUri.map("path" -> SparkCatalogUtils.URIToString(_))
// Fill in some default table options from the session conf
val tableWithDefaultOptions = table.copy(
identifier = table.identifier.copy(
database = Some(
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))),
tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions)
val dataSource: BaseRelation =
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
partitionColumns = table.partitionColumnNames,
className = table.provider.get,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption,
// As discussed in SPARK-19583, we don't check if the location is existed
catalogTable = Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false)
val partitionColumns: Array[StructField] = {
assert(table.partitionColumnNames.isEmpty)
dataSource match {
case r: HadoopFsRelation => r.partitionSchema.fields
case _ => Array.empty
}
}
val partitionColumnNames = partitionColumns.map(_.name)
val newSchema: StructType = if (table.schema.nonEmpty) {
if (partitionColumnsInSchema) {
table.schema
} else {
table.schema.fields.map(_.name).intersect(partitionColumnNames) match {
case Array() => StructType(table.schema.fields ++ partitionColumns)
case arr => {
val message = "Partition column names: " +
s"[${arr.mkString(",")}] cannot exist in user specified schema.\n" +
s" Inferred partition columns: [${partitionColumnNames.mkString(",")}].\n" +
s" User specified schema:\n${table.schema.treeString}"
throw new ConflictedSchemaException(message)
}
}
}
} else {
dataSource.schema
}
val newTable =
table.copy(
schema = newSchema,
partitionColumnNames = partitionColumnNames,
// If metastore partition management for file source tables is enabled, we start off with
// partition provider hive, but no partitions in the metastore. The user has to call
// `msck repair table` to populate the table partitions.
tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
sessionState.conf.manageFilesourcePartitions)
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.