Created
August 17, 2020 16:09
-
-
Save rafaelkyrdan/2bea8385aadd71be5bf67cddeec59581 to your computer and use it in GitHub Desktop.
How to migrate DataSourceV2 into Spark 3.0.0
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// 1. | |
// Cannot find what should I use instead of ReadSupport, ReadSupport, DataSourceReader | |
// if instead of ReadSupport we have to use now Scan then what happened to method createReader? | |
class DefaultSource extends ReadSupport { | |
override def createReader(options: DataSourceOptions): DataSourceReader = new GeneratingReader() | |
} | |
class GeneratingReader() extends DataSourceReader { | |
override def readSchema(): StructType = {...} | |
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = { | |
val partitions = new util.ArrayList[InputPartition[InternalRow]]() | |
... | |
partitions.add(new GeneratingInputPartition(...)) | |
} | |
override def outputPartitioning(): Partitioning = {...} | |
} | |
// 2. | |
// Here instead of | |
import org.apache.spark.sql.sources.v2.reader.partitioning.{Distribution, Partitioning} | |
import org.apache.spark.sql.sources.v2.reader.{InputPartition, SupportsReportPartitioning} | |
// I should use these: | |
import org.apache.spark.sql.connector.read.partitioning.{Distribution, Partitioning} | |
import org.apache.spark.sql.connector.read.{InputPartition, SupportsReportPartitioning} | |
// right? | |
class GeneratingReader() extends DataSourceReader { | |
override def readSchema(): StructType = {...} | |
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = { | |
val partitions = new util.ArrayList[InputPartition[InternalRow]]() | |
... | |
partitions.add(new GeneratingInputPartition(...)) | |
} | |
override def outputPartitioning(): Partitioning = {...} | |
} | |
// 3. | |
// Haven't found what should I use instead of | |
import org.apache.spark.sql.sources.v2.reader.InputPartition | |
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader | |
// if a new interface is from this package `connector.read` then it has totally different new contract. | |
import org.apache.spark.sql.connector.read.InputPartition | |
class GeneratingInputPartition() extends InputPartition[InternalRow] { | |
override def createPartitionReader(): InputPartitionReader[InternalRow] = new GeneratingInputPartitionReader(...) | |
} | |
class GeneratingInputPartitionReader() extends InputPartitionReader[InternalRow] { | |
override def next(): Boolean = ... | |
override def get(): InternalRow = ... | |
override def close(): Unit = ... | |
} | |
// 4. | |
// And usage will remain the same? | |
val df = sparkSession.read | |
.format("sources.v2.generating") | |
.option(OPT_PARTITIONS, numberOfRows) | |
.option(OPT_DESCRIPTOR, descriptorJson) | |
.option(OPT_SOURCE_TYPE, sourceConnection) | |
.load() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
nice~