Iceberg Lower/Upper Bounds in Data Files (Parquet vs Avro)
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.hadoop.HadoopFileIO
import org.apache.iceberg.spark.Spark3Util
import org.apache.iceberg.{DataFile, Snapshot, TableProperties, Table => IcebergTable}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.junit.JUnitRunner
import java.nio.file.{Files, Path}
import java.sql.Timestamp
import java.util.UUID
import scala.collection.JavaConverters._
import scala.util.Random
// scalastyle:off
class IcebergTableStatsTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll {
private lazy val sparkDir: Path = Files.createTempDirectory(s"${this.getClass.getSimpleName}-${UUID.randomUUID}")
val catalogName = "foo"
val warehouseLocation = s"/tmp/iceberg-test/warehouse"
implicit lazy val sparkConf: SparkConf = new SparkConf()
.set("spark.driver.bindAddress", "")
.set("spark.ui.enabled", "false")
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set(s"spark.sql.catalog.$catalogName", "org.apache.iceberg.spark.SparkCatalog")
.set(s"spark.sql.catalog.$catalogName.type", "hadoop")
.set(s"spark.sql.catalog.$catalogName.warehouse", warehouseLocation)
private implicit lazy val spark: SparkSession = SparkSession.builder()
.config("spark.sql.warehouse.dir", s"$sparkDir/spark-warehouse")
"Parquet Table" should "have lower/upper bounds set for its data files" in {
"Avro Table" should "have lower/upper bounds set for its data files" in {
private[this] def test(format: String): Unit = {
import spark.implicits._
implicit val hadoopConf: Configuration = spark.sparkContext.hadoopConfiguration
val namespace = "namespace"
val uniqueId = Integer.toHexString((new Random()).nextInt())
val name = s"mt_$uniqueId"
val tableName = s"$catalogName.$namespace.`$name`"
// Create Table
val schema: StructType = StructType(Array(
StructField("Id", StringType, nullable = false),
StructField("ReceivedAt", TimestampType, nullable = false)
// Define the structure of your data
val data = Seq(
Row("1", new Timestamp(System.currentTimeMillis())),
Row("2", new Timestamp(System.currentTimeMillis() + 1000)), // 1 second later
Row("3", new Timestamp(System.currentTimeMillis() + 2000)) // 2 seconds later
spark.sql(s"create database if not exists $namespace")
spark.sql(s"""CREATE TABLE $tableName (
| ReceivedAt TIMESTAMP
|USING iceberg
| "write.format.default" = "$format",
| "write.delete.format.default" = "$format",
| "${TableProperties.SPARK_WRITE_ACCEPT_ANY_SCHEMA}" = "true"
// Create a DataFrame with the specified structure
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data), // Convert the Seq of Rows to an RDD
// Convert 'received_at' from String to Timestamp
val dfWithTimestamp: DataFrame = df
.withColumn("ReceivedAt", to_timestamp($"ReceivedAt"))
dfWithTimestamp.collect().foreach { row =>
println("----- Row -----")
print(s"Id: ${row.getAs("Id")}, Received At: ${row.getAs("ReceivedAt")}")
// Load the Iceberg table
val icebergTable: IcebergTable = Spark3Util.loadIcebergTable(spark, tableName)
// Insert the DataFrame into the table
.mode("append") // Use "overwrite" if you want to replace existing data
// Accessing the table's metadata files
val io: HadoopFileIO = new HadoopFileIO(hadoopConf)
val snapshot: Snapshot = icebergTable.currentSnapshot()
val files: List[DataFile] = snapshot.addedDataFiles(io).asScala.toList
println(s"Files: ${files.mkString(", ")}")
// Iterate through the metadata files and print bounds
files.foreach { file =>
println(s"File Path: ${file.path()}")
println(s"Record Count: ${file.recordCount()}")
println(s"Lower Bounds: ${file.lowerBounds()}")
println(s"Upper Bounds: ${file.upperBounds()}")
val bounds: Boolean = files
.map { f => !f.upperBounds().isEmpty && !f.lowerBounds().isEmpty}
println(s"Bounds: $bounds")
bounds shouldBe true
val totalCount: Long =
println(s"Total Count: $totalCount")
totalCount shouldBe data.size
override protected def afterAll(): Unit = {
