Skip to content

Instantly share code, notes, and snippets.

@alexeykudinkin
Created June 9, 2022 20:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexeykudinkin/42c3cc0cf3006fd982d2dc24d6fe401d to your computer and use it in GitHub Desktop.
Save alexeykudinkin/42c3cc0cf3006fd982d2dc24d6fe401d to your computer and use it in GitHub Desktop.
Read Hudi's Column Stats Index
////////////////////////////////////////////////////////////////
// Load Metadata Table (for validation)
////////////////////////////////////////////////////////////////
val metadataTablePath = s"$outputPath/.hoodie/metadata"
val targetColStatsIndexColumns = Seq(
HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
val requiredMetadataIndexColumns =
(targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
val metadataTableDF = {
spark.read.format("hudi")
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
}
val colStatsDF: DataFrame = {
metadataTableDF
.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
.select(requiredMetadataIndexColumns.map(col): _*)
}
colStatsDF.createOrReplaceTempView("col_stats")
// Querying Column Stats Index
spark.sql("SELECT * FROM col_stats WHERE columnName = 'customer_id' AND minValue <= '10000004' AND '10000004' <= maxValue").show
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment