Skip to content

Instantly share code, notes, and snippets.

@GrigorievNick
Last active August 16, 2021 21:29
Show Gist options
  • Save GrigorievNick/535c0e13f148f34b7bb652f31d681609 to your computer and use it in GitHub Desktop.
Save GrigorievNick/535c0e13f148f34b7bb652f31d681609 to your computer and use it in GitHub Desktop.
IcebergRead from executor
import com.google.common.collect.Maps
import org.apache.iceberg.TableProperties
import org.apache.iceberg.data.IcebergGenerics
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil.convert
import org.apache.spark.TaskContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.functions.col
import org.scalatest.BeforeAndAfterAll
import org.scalatest.DoNotDiscover
import org.scalatest.FunSuite
import java.sql.Timestamp.valueOf
import java.time.LocalDateTime
import scala.collection.JavaConverters._
import scala.reflect.io.Directory
import scala.reflect.io.Path
@DoNotDiscover
class IcebergMergeOnRead extends FunSuite with BeforeAndAfterAll {
private val testPath = "/tmp/iceberg_cdc_test"
private val icebergTable = "hdl.enrichment_table"
private val icebergCatalogPath = s"$testPath/iceberg_catalog"
private val dataPath = s"$testPath/datalake"
private val timestamp: LocalDateTime = LocalDateTime.now().minusDays(1)
val sparkSession: SparkSession = SparkSession.builder()
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions ")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", icebergCatalogPath)
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.master("local")
.getOrCreate()
import sparkSession.implicits._
override def beforeAll(): Unit = {
sparkSession.sql(s"CREATE NAMESPACE IF NOT EXISTS hdl")
sparkSession.sql(s"DROP TABLE IF EXISTS $icebergTable") // DROP do not work
Directory(Path(testPath)).deleteRecursively()
}
test("Create iceberg table and parquet to merge") {
val icebergEnricherData = ((10 until 30) ++ (40 until 90)).map(id => (id, s"dataIceberg$id", valueOf(timestamp)))
val dataToEnrich = (0 until 100).map(id => (id, "data", valueOf(timestamp)))
sparkSession.sql(s"""CREATE TABLE $icebergTable (
| id bigint,
| icebergData string,
| icebergTs timestamp)
|USING iceberg
|PARTITIONED BY (truncate(10,id), days(icebergTs))
|TBLPROPERTIES (
|'${TableProperties.FORMAT_VERSION}'='2',
|'${TableProperties.OBJECT_STORE_ENABLED}'=true,
|'${TableProperties.OBJECT_STORE_PATH}'='$testPath/iceberg_data/'
|)""".stripMargin)
icebergEnricherData
.toDF("id", "icebergData", "icebergTs")
.sortWithinPartitions($"id", $"icebergTs")
.writeTo(icebergTable)
.append()
dataToEnrich
.toDF("id", "data", "ts")
.repartition(4, col("id")) // This is the problem
.write
.format("delta")
.mode(SaveMode.Overwrite)
.save(dataPath)
sparkSession.read.format("iceberg").load(icebergTable).show(numRows = 100)
}
test("Read iceberg table from executor") {
val tables = new HadoopTables(sparkSession.sessionState.newHadoopConf())
val table = tables.load(s"$icebergCatalogPath/${icebergTable.replace('.', '/')}")
val enrichmentColumns = List("icebergData")
val enrichmentColumnSchema = table.schema().select(enrichmentColumns: _*)
val inputDF = sparkSession
.read
.format("delta")
.load(dataPath)
val rowEncoder = RowEncoder(convert(enrichmentColumnSchema).fields.foldLeft(inputDF.schema)((s, f) => s.add(f)))
val enricherData = inputDF
.sort("id") // main issue to avoid this sort and still do not have too many ranges
.coalesce(4)
.mapPartitions { it: Iterator[Row] =>
// hash right inner join
it
.sliceConsecutive(_.getAs[Integer]("id").toLong)
.flatMap { range =>
val rowToLong: com.google.common.base.Function[Row, Long] =
(r: Row) => r.getAs[Integer]("id").toLong
val hashedRows = Maps.uniqueIndex(range.asJava, rowToLong)
val endBorder = hashedRows.keySet().asScala.max
val startBorder = hashedRows.keySet().asScala.min
val scanTasks = IcebergGenerics.read(table)
.select(enrichmentColumns: _*)
.where(Expressions.and(
Expressions.greaterThanOrEqual("id", startBorder),
Expressions.lessThan("id", endBorder)
)).build()
try scanTasks
.asScala
.map { record =>
val inputRow = hashedRows.get(record.getField("id"))
Row.fromSeq(inputRow.toSeq ++ enrichmentColumns.map(record.getField)) // TODO type converter
} finally scanTasks.close()
}
}(rowEncoder)
enricherData.show(numRows = 100, truncate = false)
}
}
implicit class SliceBySubsequence[T, K](it: Iterator[T]) extends Serializable {
/**
* @param keyExtractor – The function that used to extract key from iterator entry.
* @return - An iterator of subsequences(Iterator) with same key.
*
* Note: Reuse: After calling this method, one should discard the iterator it was called on,
* and use only the iterator that was returned. Using the old iterator is undefined, subject to change,
* and may result in changes to the new iterator as well.
*/
def sliceBy(keyExtractor: T => K): Iterator[Iterator[T]] = new AbstractIterator[Iterator[T]] {
private var bufferedIt = it.buffered
def hasNext: Boolean = bufferedIt.hasNext
def next(): Iterator[T] =
bufferedIt.headOption match {
case Some(hd) =>
val (subsequence, rest) = bufferedIt.span(r => keyExtractor(r) == keyExtractor(hd))
bufferedIt = rest.buffered
subsequence
case None =>
Iterator.empty
}
}
/**
* @param extractSequentialId – The function that used to extract sequential id from iterator entry.
* @return - An iterator of subsequences(Iterator) slice to consecutive id sequence(Range).
*
* Note: Reuse: After calling this method, one should discard the iterator it was called on,
* and use only the iterator that was returned. Using the old iterator is undefined, subject to change,
* and may result in changes to the new iterator as well.
*/
def sliceConsecutive(extractSequentialId: T => Long): Iterator[Iterator[T]] = new AbstractIterator[Iterator[T]] {
var iteratorToSlice: Iterator[T] = it
override def hasNext: Boolean = iteratorToSlice.hasNext
override def next(): Iterator[T] = {
var nextIdInSequence: Option[Long] = None
val (consecutiveSubSequence, rest) = iteratorToSlice.span { row =>
val isSequential = extractSequentialId(row) == nextIdInSequence.getOrElse(extractSequentialId(row))
if (isSequential) nextIdInSequence = Some(extractSequentialId(row) + 1)
else nextIdInSequence = None
isSequential
}
iteratorToSlice = rest
consecutiveSubSequence
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment