Skip to content

Instantly share code, notes, and snippets.

@feliperazeek
Created April 8, 2024 16:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save feliperazeek/e34e766b194a644a2f9d8ef7ce2eb6a0 to your computer and use it in GitHub Desktop.
Save feliperazeek/e34e766b194a644a2f9d8ef7ce2eb6a0 to your computer and use it in GitHub Desktop.
Iceberg Lower/Upper Bounds in Data Files (Parquet vs Avro)
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.hadoop.HadoopFileIO
import org.apache.iceberg.spark.Spark3Util
import org.apache.iceberg.{DataFile, Snapshot, TableProperties, Table => IcebergTable}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.junit.JUnitRunner
import java.nio.file.{Files, Path}
import java.sql.Timestamp
import java.util.UUID
import scala.collection.JavaConverters._
import scala.util.Random
// scalastyle:off
@RunWith(classOf[JUnitRunner])
class IcebergTableStatsTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll {
private lazy val sparkDir: Path = Files.createTempDirectory(s"${this.getClass.getSimpleName}-${UUID.randomUUID}")
val catalogName = "foo"
val warehouseLocation = s"/tmp/iceberg-test/warehouse"
implicit lazy val sparkConf: SparkConf = new SparkConf()
.setAppName(getClass.getSimpleName)
.setMaster("local[*]")
.set("spark.driver.bindAddress", "127.0.0.1")
.set("spark.ui.enabled", "false")
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set(s"spark.sql.catalog.$catalogName", "org.apache.iceberg.spark.SparkCatalog")
.set(s"spark.sql.catalog.$catalogName.type", "hadoop")
.set(s"spark.sql.catalog.$catalogName.warehouse", warehouseLocation)
private implicit lazy val spark: SparkSession = SparkSession.builder()
.config(sparkConf)
.config("spark.sql.warehouse.dir", s"$sparkDir/spark-warehouse")
.getOrCreate()
"Parquet Table" should "have lower/upper bounds set for its data files" in {
test("parquet")
}
"Avro Table" should "have lower/upper bounds set for its data files" in {
test("avro")
}
private[this] def test(format: String): Unit = {
import spark.implicits._
implicit val hadoopConf: Configuration = spark.sparkContext.hadoopConfiguration
val namespace = "namespace"
val uniqueId = Integer.toHexString((new Random()).nextInt())
val name = s"mt_$uniqueId"
val tableName = s"$catalogName.$namespace.`$name`"
// Create Table
val schema: StructType = StructType(Array(
StructField("Id", StringType, nullable = false),
StructField("ReceivedAt", TimestampType, nullable = false)
))
// Define the structure of your data
val data = Seq(
Row("1", new Timestamp(System.currentTimeMillis())),
Row("2", new Timestamp(System.currentTimeMillis() + 1000)), // 1 second later
Row("3", new Timestamp(System.currentTimeMillis() + 2000)) // 2 seconds later
)
spark.sql(s"create database if not exists $namespace")
spark.sql(s"""CREATE TABLE $tableName (
| Id STRING,
| ReceivedAt TIMESTAMP
|)
|USING iceberg
|PARTITIONED BY (ReceivedAt)
|OPTIONS (
| "write.format.default" = "$format",
| "write.delete.format.default" = "$format",
| "${TableProperties.WRITE_DISTRIBUTION_MODE}" = "${TableProperties.WRITE_DISTRIBUTION_MODE_HASH}",
| "${TableProperties.MERGE_DISTRIBUTION_MODE}" = "${TableProperties.WRITE_DISTRIBUTION_MODE_HASH}",
| "${TableProperties.SPARK_WRITE_ACCEPT_ANY_SCHEMA}" = "true"
|)""".stripMargin)
// Create a DataFrame with the specified structure
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data), // Convert the Seq of Rows to an RDD
schema
)
// Convert 'received_at' from String to Timestamp
val dfWithTimestamp: DataFrame = df
.withColumn("ReceivedAt", to_timestamp($"ReceivedAt"))
dfWithTimestamp.collect().foreach { row =>
println("----- Row -----")
print(s"Id: ${row.getAs("Id")}, Received At: ${row.getAs("ReceivedAt")}")
println("")
}
// Load the Iceberg table
val icebergTable: IcebergTable = Spark3Util.loadIcebergTable(spark, tableName)
// Insert the DataFrame into the table
dfWithTimestamp.write
.mode("append") // Use "overwrite" if you want to replace existing data
.insertInto(tableName)
// Accessing the table's metadata files
val io: HadoopFileIO = new HadoopFileIO(hadoopConf)
val snapshot: Snapshot = icebergTable.currentSnapshot()
val files: List[DataFile] = snapshot.addedDataFiles(io).asScala.toList
println(s"Files: ${files.mkString(", ")}")
// Iterate through the metadata files and print bounds
files.foreach { file =>
println(s"File Path: ${file.path()}")
println(s"Record Count: ${file.recordCount()}")
println(s"Lower Bounds: ${file.lowerBounds()}")
println(s"Upper Bounds: ${file.upperBounds()}")
}
val bounds: Boolean = files
.map { f => !f.upperBounds().isEmpty && !f.lowerBounds().isEmpty}
.forall(identity)
println(s"Bounds: $bounds")
bounds shouldBe true
val totalCount: Long = files.map(_.recordCount()).sum
println(s"Total Count: $totalCount")
totalCount shouldBe data.size
}
override protected def afterAll(): Unit = {
spark.close()
super.afterAll()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment