Skip to content

Instantly share code, notes, and snippets.

@feliperazeek
Created March 14, 2024 19:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save feliperazeek/4682c5daa8447febf3243baaf41aff36 to your computer and use it in GitHub Desktop.
Save feliperazeek/4682c5daa8447febf3243baaf41aff36 to your computer and use it in GitHub Desktop.
Iceberg Residual Test
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.hadoop.HadoopFileIO
import org.apache.iceberg.spark.Spark3Util
import org.apache.iceberg.{DataFile, PartitionSpec, Schema, Snapshot, TableMetadata, TableMetadataParser, Table => IcebergTable}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.junit.JUnitRunner
import java.nio.file.{Files, Path}
import java.sql.Timestamp
import java.util.UUID
import scala.collection.JavaConverters._
// scalastyle:off
@RunWith(classOf[JUnitRunner])
class IcebergResidual2Test extends AnyFlatSpec with Matchers with BeforeAndAfterAll {
private lazy val sparkDir: Path = Files.createTempDirectory(s"${this.getClass.getSimpleName}-${UUID.randomUUID}")
val catalogName = "foo"
val warehouseLocation = s"/tmp/iceberg-test/warehouse"
println(s"Warehouse Location: $warehouseLocation")
implicit lazy val sparkConf: SparkConf = new SparkConf()
.setAppName(getClass.getSimpleName)
.setMaster("local[*]")
.set("spark.driver.bindAddress", "127.0.0.1")
.set("spark.ui.enabled", "false")
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set(s"spark.sql.catalog.$catalogName", "org.apache.iceberg.spark.SparkCatalog")
.set(s"spark.sql.catalog.$catalogName.type", "hadoop")
.set(s"spark.sql.catalog.$catalogName.warehouse", warehouseLocation)
private implicit lazy val spark: SparkSession = SparkSession.builder()
.config(sparkConf)
.config("spark.sql.warehouse.dir", s"$sparkDir/spark-warehouse")
.getOrCreate()
"Table" should "have lower/upper bounds set for its files" in {
import spark.implicits._
implicit val hadoopConf: Configuration = spark.sparkContext.hadoopConfiguration
val namespace = "namespace"
val uniqueId = "foo" // Integer.toHexString((new Random()).nextInt())
val name = s"bar_$uniqueId"
val tableName = s"$catalogName.$namespace.`$name`"
// Create Table
val schema: StructType = StructType(Array(
StructField("Id", StringType, nullable = false),
StructField("ReceivedAt", TimestampType, nullable = false)
))
// Define the structure of your data
val data = Seq(
Row("1", new Timestamp(System.currentTimeMillis())),
Row("2", new Timestamp(System.currentTimeMillis() + 1000)), // 1 second later
Row("3", new Timestamp(System.currentTimeMillis() + 2000)) // 2 seconds later
)
spark.sql(s"create database if not exists $namespace")
// Create a DataFrame with the specified structure
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data), // Convert the Seq of Rows to an RDD
schema
)
// Convert 'received_at' from String to Timestamp
val dfWithTimestamp: DataFrame = df
.withColumn("ReceivedAt", to_timestamp($"ReceivedAt"))
dfWithTimestamp.collect().foreach { row =>
println("----- Row -----")
print(s"Id: ${row.getAs("Id")}, Received At: ${row.getAs("ReceivedAt")}")
println("")
}
// Load the Iceberg table
val icebergTable: IcebergTable = Spark3Util.loadIcebergTable(spark, tableName)
// Insert the DataFrame into the table
dfWithTimestamp.write
.mode("append") // Use "overwrite" if you want to replace existing data
.insertInto(tableName)
// Accessing the table's metadata files
val io: HadoopFileIO = new HadoopFileIO(hadoopConf)
val snapshot: Snapshot = icebergTable.currentSnapshot()
val files: List[DataFile] = snapshot.addedDataFiles(io).asScala.toList
println(s"Files: ${files.mkString(", ")}")
// Iterate through the metadata files and print bounds
files.foreach { file =>
println(s"File Path: ${file.path()}")
println(s"Record Count: ${file.recordCount()}")
println(s"Lower Bounds: ${file.lowerBounds()}")
println(s"Upper Bounds: ${file.upperBounds()}")
}
val bounds: Boolean = files
.map { f => !f.upperBounds().isEmpty && !f.lowerBounds().isEmpty}
.forall(identity)
println(s"Bounds: $bounds")
bounds shouldBe true
val totalCount: Long = files.map(_.recordCount()).sum
println(s"Total Count: $totalCount")
totalCount shouldBe data.size
}
def writeMetadata(
schema: Schema,
spec: PartitionSpec,
properties: Map[String, String],
location: String)(implicit conf: Configuration): String = {
val metadata: TableMetadata = TableMetadata.newTableMetadata(schema, spec, location, properties.asJava)
val path: String = String.format("%s/metadata/v1.metadata.json", metadata.location)
writeMetadata(path, metadata)
// val content = "1"
// val versionPath = Paths.get(s"$location/")
// Files.write(versionPath, content.getBytes(StandardCharsets.UTF_8))
path
}
private def writeMetadata(path: String, metadata: TableMetadata)(implicit conf: Configuration): Unit = {
val io = new HadoopFileIO(conf)
val outputFile = io.newOutputFile(path)
TableMetadataParser.write(metadata, outputFile)
}
override protected def afterAll(): Unit = {
spark.close()
super.afterAll()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment