Skip to content

Instantly share code, notes, and snippets.

@rafaelkyrdan
Created August 17, 2020 16:09
Show Gist options
  • Save rafaelkyrdan/2bea8385aadd71be5bf67cddeec59581 to your computer and use it in GitHub Desktop.
Save rafaelkyrdan/2bea8385aadd71be5bf67cddeec59581 to your computer and use it in GitHub Desktop.
How to migrate DataSourceV2 into Spark 3.0.0
// 1.
// Cannot find what should I use instead of ReadSupport, ReadSupport, DataSourceReader
// if instead of ReadSupport we have to use now Scan then what happened to method createReader?
class DefaultSource extends ReadSupport {
override def createReader(options: DataSourceOptions): DataSourceReader = new GeneratingReader()
}
class GeneratingReader() extends DataSourceReader {
override def readSchema(): StructType = {...}
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
val partitions = new util.ArrayList[InputPartition[InternalRow]]()
...
partitions.add(new GeneratingInputPartition(...))
}
override def outputPartitioning(): Partitioning = {...}
}
// 2.
// Here instead of
import org.apache.spark.sql.sources.v2.reader.partitioning.{Distribution, Partitioning}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, SupportsReportPartitioning}
// I should use these:
import org.apache.spark.sql.connector.read.partitioning.{Distribution, Partitioning}
import org.apache.spark.sql.connector.read.{InputPartition, SupportsReportPartitioning}
// right?
class GeneratingReader() extends DataSourceReader {
override def readSchema(): StructType = {...}
override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
val partitions = new util.ArrayList[InputPartition[InternalRow]]()
...
partitions.add(new GeneratingInputPartition(...))
}
override def outputPartitioning(): Partitioning = {...}
}
// 3.
// Haven't found what should I use instead of
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
// if a new interface is from this package `connector.read` then it has totally different new contract.
import org.apache.spark.sql.connector.read.InputPartition
class GeneratingInputPartition() extends InputPartition[InternalRow] {
override def createPartitionReader(): InputPartitionReader[InternalRow] = new GeneratingInputPartitionReader(...)
}
class GeneratingInputPartitionReader() extends InputPartitionReader[InternalRow] {
override def next(): Boolean = ...
override def get(): InternalRow = ...
override def close(): Unit = ...
}
// 4.
// And usage will remain the same?
val df = sparkSession.read
.format("sources.v2.generating")
.option(OPT_PARTITIONS, numberOfRows)
.option(OPT_DESCRIPTOR, descriptorJson)
.option(OPT_SOURCE_TYPE, sourceConnection)
.load()
@rafaelkyrdan
Copy link
Author

Implement table interface

class GeneratingInputBatchTable(
                                 private val partitionNum: Int,
                                 private val descriptorJson: String,
                                 private val sourceType: String
                               ) extends Table
  with SupportsRead
  with SchemaSupport {
  override def name(): String = this.getClass.toString

  override def schema(): StructType = ??? // your implementation 

  override def capabilities(): JSet[TableCapability] = Set(TableCapability.BATCH_READ).asJava

  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
    new GeneratingInputScanBuilder(partitionNum, descriptorJson, sourceType)
}

@rafaelkyrdan
Copy link
Author

Implement Scan builder

class GeneratingInputScanBuilder(
  .....
) extends ScanBuilder {
  override def build(): Scan = new GeneratingInputScan(....)
}

@rafaelkyrdan
Copy link
Author

Implement Scan

class GeneratingInputScan(
  ...
                         ) extends Scan
  with Batch
  with SchemaSupport {

  override def readSchema(): StructType = ???

  override def toBatch: Batch = this

  override def planInputPartitions(): Array[InputPartition] = ???

  override def createReaderFactory(): PartitionReaderFactory = new GeneratingInputPartitionReaderFactory()
}

@rafaelkyrdan
Copy link
Author

Implement Reader factory

class GeneratingInputPartitionReaderFactory extends PartitionReaderFactory {
  override def createReader(partition: InputPartition): PartitionReader[InternalRow] =
    new GeneratingInputPartitionReader(partition.asInstanceOf[YourInputPartition])
}

@rafaelkyrdan
Copy link
Author

and your reader

class GeneratingInputPartitionReader(inputPartition: YourInputPartition) extends PartitionReader[InternalRow] {
  
  override def next(): Boolean = ???

  override def get(): InternalRow = ???

  def close() = Unit
  

}

@jackzhangsir
Copy link

nice~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment