Created
August 17, 2020 16:09
-
-
Save rafaelkyrdan/2bea8385aadd71be5bf67cddeec59581 to your computer and use it in GitHub Desktop.
How to migrate DataSourceV2 into Spark 3.0.0
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// 1. | |
// Cannot find what should I use instead of ReadSupport, ReadSupport, DataSourceReader | |
// if instead of ReadSupport we have to use now Scan then what happened to method createReader? | |
class DefaultSource extends ReadSupport { | |
override def createReader(options: DataSourceOptions): DataSourceReader = new GeneratingReader() | |
} | |
class GeneratingReader() extends DataSourceReader { | |
override def readSchema(): StructType = {...} | |
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = { | |
val partitions = new util.ArrayList[InputPartition[InternalRow]]() | |
... | |
partitions.add(new GeneratingInputPartition(...)) | |
} | |
override def outputPartitioning(): Partitioning = {...} | |
} | |
// 2. | |
// Here instead of | |
import org.apache.spark.sql.sources.v2.reader.partitioning.{Distribution, Partitioning} | |
import org.apache.spark.sql.sources.v2.reader.{InputPartition, SupportsReportPartitioning} | |
// I should use these: | |
import org.apache.spark.sql.connector.read.partitioning.{Distribution, Partitioning} | |
import org.apache.spark.sql.connector.read.{InputPartition, SupportsReportPartitioning} | |
// right? | |
class GeneratingReader() extends DataSourceReader { | |
override def readSchema(): StructType = {...} | |
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = { | |
val partitions = new util.ArrayList[InputPartition[InternalRow]]() | |
... | |
partitions.add(new GeneratingInputPartition(...)) | |
} | |
override def outputPartitioning(): Partitioning = {...} | |
} | |
// 3. | |
// Haven't found what should I use instead of | |
import org.apache.spark.sql.sources.v2.reader.InputPartition | |
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader | |
// if a new interface is from this package `connector.read` then it has totally different new contract. | |
import org.apache.spark.sql.connector.read.InputPartition | |
class GeneratingInputPartition() extends InputPartition[InternalRow] { | |
override def createPartitionReader(): InputPartitionReader[InternalRow] = new GeneratingInputPartitionReader(...) | |
} | |
class GeneratingInputPartitionReader() extends InputPartitionReader[InternalRow] { | |
override def next(): Boolean = ... | |
override def get(): InternalRow = ... | |
override def close(): Unit = ... | |
} | |
// 4. | |
// And usage will remain the same? | |
val df = sparkSession.read | |
.format("sources.v2.generating") | |
.option(OPT_PARTITIONS, numberOfRows) | |
.option(OPT_DESCRIPTOR, descriptorJson) | |
.option(OPT_SOURCE_TYPE, sourceConnection) | |
.load() |
Implement table interface
class GeneratingInputBatchTable(
private val partitionNum: Int,
private val descriptorJson: String,
private val sourceType: String
) extends Table
with SupportsRead
with SchemaSupport {
override def name(): String = this.getClass.toString
override def schema(): StructType = ??? // your implementation
override def capabilities(): JSet[TableCapability] = Set(TableCapability.BATCH_READ).asJava
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
new GeneratingInputScanBuilder(partitionNum, descriptorJson, sourceType)
}
Implement Scan builder
class GeneratingInputScanBuilder(
.....
) extends ScanBuilder {
override def build(): Scan = new GeneratingInputScan(....)
}
Implement Scan
class GeneratingInputScan(
...
) extends Scan
with Batch
with SchemaSupport {
override def readSchema(): StructType = ???
override def toBatch: Batch = this
override def planInputPartitions(): Array[InputPartition] = ???
override def createReaderFactory(): PartitionReaderFactory = new GeneratingInputPartitionReaderFactory()
}
Implement Reader factory
class GeneratingInputPartitionReaderFactory extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] =
new GeneratingInputPartitionReader(partition.asInstanceOf[YourInputPartition])
}
and your reader
class GeneratingInputPartitionReader(inputPartition: YourInputPartition) extends PartitionReader[InternalRow] {
override def next(): Boolean = ???
override def get(): InternalRow = ???
def close() = Unit
}
nice~
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The main class in the package