Skip to content

Instantly share code, notes, and snippets.

@liancheng
Last active August 29, 2015 14:14
Show Gist options
  • Save liancheng/0b0478b8803e940401f7 to your computer and use it in GitHub Desktop.
Save liancheng/0b0478b8803e940401f7 to your computer and use it in GitHub Desktop.
Data source API draft
/**
* :: DeveloperApi ::
* Base class for table scan operators.
*/
@DeveloperApi
abstract class Scan {
def sqlContext: SQLContext
/**
* Returns an estimated size of the input of this scan operator in bytes.
*/
def sizeInBytes: Long = sqlContext.conf.defaultSizeInBytes
def execute(): RDD[Row]
}
/**
* :: DeveloperApi ::
* All readable data source relations should extend this trait.
*/
@DeveloperApi
trait ScannableRelation {
/** Builds a scan operator. */
def buildScan(): Scan
}
/**
* :: DeveloperApi ::
* Data source relations which support column pruning optimization should extend
* this trait.
*/
@DeveloperApi
trait ColumnPruningRelation {
/**
* Sets requested columns. When scanning this relation, columns not listed in
* `columns` should be skipped for better performance.
*
* @param columns Names of requested columns
*/
def setRequestedColumns(columns: Array[String]): Unit
}
/**
* :: DeveloperApi ::
* Data source relations which support filter push-down optimization should
* extend this trait.
*/
@DeveloperApi
trait FilterPushdownRelation {
/**
* Tries to push down filters collected from the query plan. Filters passed
* in are in conjunctive normal form (CNF). This method should only accept
* those filters which can be recognized by the data source, and return all
* the others.
*
* @param filters Candidate filters to be pushed down
* @return Filters that are not accepted by this data source
*/
def pushFilters(filters: Array[Filter]): Array[Filter]
}
/**
* :: DeveloperApi ::
* A [[Partition]] is basically a directory associated with one or more
* partition column values. Schema of partition columns are defined in a
* corresponding [[PartitionSpec]].
*
* @param values A row containing values of all partition columns in this
* partition.
* @param path File path of this partition.
*/
@DeveloperApi
case class Partition(values: Row, path: String)
/**
* :: DeveloperApi ::
* A [[PartitionSpec]] specifies partitioning information of a relation.
*
* @param partitionColumns Specifies schema of all partition columns.
* @param dynamicPartitionColumns Specifies schema of all dynamic partition
* columns.
* @param partitions All partitions contained in this partition spec.
*/
@DeveloperApi
case class PartitionSpec(
partitionColumns: StructType,
dynamicPartitionColumns: StructType,
partitions: Seq[Partition])
/**
* :: DeveloperApi ::
* Data source relations which support partition discovery and partition pruning
* optimization should extend this trait.
*
* The following data source options can be used to control partitioning
* behaviors and must be respected by all data sources that extend this trait:
*
* - [[PartitionedRelation.PARTITION_DISCOVERY_ENABLED]]
* - [[PartitionedRelation.PARTITION_DEFAULT_NAME_EMPTY]]
* - [[PartitionedRelation.PARTITION_DEFAULT_NAME_NULL]]
* - [[PartitionedRelation.PARTITION_DEFAULT_NAME_CORRUPTED]]
*
* Semantics of the last three options are similar to the
* `hive.exec.default.partition.name` configuration in Hive.
*/
@DeveloperApi
trait PartitionedRelation {
/**
* Given paths of partition directories of a partitioned table, extracts
* partitioning information of
*
* @param paths Paths of all partition directories.
* @return A [[PartitionSpec]] which specifies partitioning information of
* this relation.
*/
def parsePartitions(paths: Seq[String]): PartitionSpec
/** Sets the partition specification. */
def setPartitionSpec(spec: PartitionSpec): Unit
/**
* Sets partition columns explicitly to avoid partition column inference.
*/
def setPartitionColumns(columns: StructType): Unit
/**
* Sets requested partitions. When scanning this relation, partitions not
* listed in `partitions` should be skipped for better performance.
*
* @param partitions Requested partitions.
*/
def setRequestedPartitions(partitions: Seq[Partition]): Unit
}
object PartitionedRelation {
/**
* When set to `false`, partition discovery is not performed. This option
* can be useful when, for example, the user only wants to load data stored
* in a single partition directory without introducing partition columns
* encoded in the directory path.
*/
val PARTITION_DISCOVERY_ENABLED = "partition.discovery.enabled"
/**
* This option specifies the default partition name when a partition column
* value is an empty string.
*/
val PARTITION_DEFAULT_NAME_EMPTY = "partition.defaultName.empty"
/**
* This option specifies the default partition name when a partition column
* value is null.
*/
val PARTITION_DEFAULT_NAME_NULL = "partition.defaultName.null"
/**
* This option specifies the default partition name when a partition column
* value cannot be successfully escaped and encoded into the partition
* directory path.
*/
val PARTITION_DEFAULT_NAME_CORRUPTED = "partition.defaultName.corrupted"
/**
* This option specifies the default partition name when a partition column
* value is an empty string, null, or cannot be successfully escaped and
* encoded into the partition directory path.
*/
@deprecated(
s"""Use $PARTITION_DEFAULT_NAME_EMPTY, $PARTITION_DEFAULT_NAME_NULL, and
|$PARTITION_DEFAULT_NAME_CORRUPTED instead
""".stripMargin.mkString(" "),
"1.4.0")
val PARTITION_DEFAULT_NAME = "partition.defaultName.empty"
}
/**
* Data source relations which support inserting to a partitioned table should
* extends this trait.
*/
trait PartitionedInsertableRelation {
/**
* A function type used to build a partition path with a given row and the
* schema of all partition columns.
*/
type PartitionPathBuilder = (Row, StructType) => String
/** Returns a partition path builder. */
def partitionPathBuilder: PartitionPathBuilder
/**
* Insert into partition(s). The length of `partitionColumns` should be no
* less than the length of `partitionValues`. If the former is larger, then
* extra trailing columns are considered to be dynamic partition columns.
*
* @param data A `DataFrame` containing data to be inserted
* @param partitionColumns Specifies schema of partition columns
* @param partitionValues Specifies static partition column values
* @param mode Save mode.
*/
def insert(
data: DataFrame,
partitionColumns: StructType,
partitionValues: Row,
mode: SaveMode): Unit
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment