IcebergRead from executor
import org.apache.iceberg.TableProperties
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil.convert
import org.apache.spark.TaskContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.functions.col
import org.scalatest.BeforeAndAfterAll
import org.scalatest.DoNotDiscover
import org.scalatest.FunSuite
import java.sql.Timestamp.valueOf
import java.time.LocalDateTime
import scala.collection.JavaConverters._
class IcebergMergeOnRead extends FunSuite with BeforeAndAfterAll {
private val testPath = "/tmp/iceberg_cdc_test"
private val icebergTable = "hdl.enrichment_table"
private val icebergCatalogPath = s"$testPath/iceberg_catalog"
private val dataPath = s"$testPath/datalake"
private val timestamp: LocalDateTime =
val sparkSession: SparkSession = SparkSession.builder()
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions ")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", icebergCatalogPath)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
import sparkSession.implicits._
override def beforeAll(): Unit = {
sparkSession.sql(s"CREATE NAMESPACE IF NOT EXISTS hdl")
sparkSession.sql(s"DROP TABLE IF EXISTS $icebergTable") // DROP do not work
test("Create iceberg table and parquet to merge") {
val icebergEnricherData = ((10 until 30) ++ (40 until 90)).map(id => (id, s"dataIceberg$id", valueOf(timestamp)))
val dataToEnrich = (0 until 100).map(id => (id, "data", valueOf(timestamp)))
sparkSession.sql(s"""CREATE TABLE $icebergTable (
| id bigint,
| icebergData string,
| icebergTs timestamp)
|USING iceberg
|PARTITIONED BY (truncate(10,id), days(icebergTs))
.toDF("id", "icebergData", "icebergTs")
.sortWithinPartitions($"id", $"icebergTs")
.toDF("id", "data", "ts")
.repartition(4, col("id")) // This is the problem
.save(dataPath)"iceberg").load(icebergTable).show(numRows = 100)
test("Read iceberg table from executor") {
val tables = new HadoopTables(sparkSession.sessionState.newHadoopConf())
val table = tables.load(s"$icebergCatalogPath/${icebergTable.replace('.', '/')}")
val enrichmentColumns = List("icebergData")
val enrichmentColumnSchema = table.schema().select(enrichmentColumns: _*)
val inputDF = sparkSession
val rowEncoder = RowEncoder(convert(enrichmentColumnSchema).fields.foldLeft(inputDF.schema)((s, f) => s.add(f)))
val enricherData = inputDF
.sort("id") // main issue to avoid this sort and still do not have too many ranges
.mapPartitions { it: Iterator[Row] =>
// hash right inner join
.flatMap { range =>
val rowToLong:[Row, Long] =
(r: Row) => r.getAs[Integer]("id").toLong
val hashedRows = Maps.uniqueIndex(range.asJava, rowToLong)
val endBorder = hashedRows.keySet().asScala.max
val startBorder = hashedRows.keySet().asScala.min
val scanTasks =
.select(enrichmentColumns: _*)
Expressions.greaterThanOrEqual("id", startBorder),
Expressions.lessThan("id", endBorder)
try scanTasks
.map { record =>
val inputRow = hashedRows.get(record.getField("id"))
Row.fromSeq(inputRow.toSeq ++ // TODO type converter
} finally scanTasks.close()
}(rowEncoder) = 100, truncate = false)
implicit class SliceBySubsequence[T, K](it: Iterator[T]) extends Serializable {
* @param keyExtractor – The function that used to extract key from iterator entry.
* @return - An iterator of subsequences(Iterator) with same key.
* Note: Reuse: After calling this method, one should discard the iterator it was called on,
* and use only the iterator that was returned. Using the old iterator is undefined, subject to change,
* and may result in changes to the new iterator as well.
def sliceBy(keyExtractor: T => K): Iterator[Iterator[T]] = new AbstractIterator[Iterator[T]] {
private var bufferedIt = it.buffered
def hasNext: Boolean = bufferedIt.hasNext
def next(): Iterator[T] =
bufferedIt.headOption match {
case Some(hd) =>
val (subsequence, rest) = bufferedIt.span(r => keyExtractor(r) == keyExtractor(hd))
bufferedIt = rest.buffered
case None =>
* @param extractSequentialId – The function that used to extract sequential id from iterator entry.
* @return - An iterator of subsequences(Iterator) slice to consecutive id sequence(Range).
* Note: Reuse: After calling this method, one should discard the iterator it was called on,
* and use only the iterator that was returned. Using the old iterator is undefined, subject to change,
* and may result in changes to the new iterator as well.
def sliceConsecutive(extractSequentialId: T => Long): Iterator[Iterator[T]] = new AbstractIterator[Iterator[T]] {
var iteratorToSlice: Iterator[T] = it
override def hasNext: Boolean = iteratorToSlice.hasNext
override def next(): Iterator[T] = {
var nextIdInSequence: Option[Long] = None
val (consecutiveSubSequence, rest) = iteratorToSlice.span { row =>
val isSequential = extractSequentialId(row) == nextIdInSequence.getOrElse(extractSequentialId(row))
if (isSequential) nextIdInSequence = Some(extractSequentialId(row) + 1)
else nextIdInSequence = None
iteratorToSlice = rest
