Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bhasudha/03c342a6c4d7a80aabe02d8b20067879 to your computer and use it in GitHub Desktop.
Save bhasudha/03c342a6c4d7a80aabe02d8b20067879 to your computer and use it in GitHub Desktop.
Hudi record level meta field benchmark

Goal:

The idea is to analyze the cost of the metafields that Hudi stores at record level.

Setup:

Choose narrow to wide tables with varying columns ➝. 10, 30, 100, 1000 columns etc. Use Auto keygen with bulk_insert operation for Hudi. Generate vanilla parquet data via spark and non-partitioned HUDI COW table for comparison. For spark we can reduce the partition to 1 to compare it to non-partitioned Hudi table. We will assume the input json data size is roughly the same for all three tables. Here we take ~ 350MB input json file size.

Schema generation:

I used chat gpt to generate a random json schema with # of columns that have primitive data types and built-in formats. One such schema for a 10-column table looks like below. Built-in formats in Json allow for more realistic data. The columns' names are boring though.

/tmp/hudi-metafields-benchmark/ten-columns-schema.json

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "My Data Schema",
  "type": "object",
  "properties": {
    "column1": {
      "type": "integer"
    },
    "column2": {
      "type": "string"
    },
    "column3": {
      "type": "boolean"
    },
    "column4": {
      "type": "number"
    },
    "column5": {
      "type": "integer"
    },
    "column6": {
      "type": "string",
      "format": "date-time"
    },
    "column7": {
      "type": "string",
      "format": "email"
    },
    "column8": {
      "type": "string",
      "format": "uri"
    },
    "column9": {
      "type": "string",
      "maxLength": 50
    },
    "column10": {
      "type": "string",
      "pattern": "^[A-Za-z]+$"
    }
  },
  "required": [
    "column1",
    "column2",
    "column3",
    "column4",
    "column5",
    "column6",
    "column7",
    "column8",
    "column9",
    "column10"
  ]
}

Sample Data Generation:

There are plenty of data generators available to use. I found Json Schema Faker simple to work with in Python.

Install jsf using

pip install jsf

Generate sample data using the code snippet below.

from jsf import JSF
import json
file_prefix='ten-columns' # Change this to reflect file name everywhere.
faker = JSF.from_json('/tmp/hudi-metafields-benchmark/{}-schema.json'.format(file_prefix))
f1 = open('/tmp/hudi-metafields-benchmark/{}-input-data.json'.format(file_prefix), "a")
f1.write("[")
fake_json = faker.generate()
f1.write(json.dumps(fake_json))
# Generate 1M records based on the schema above. 
# You will want to change this to produce ~350MB file
for x in range(1000000):
  fake_json = faker.generate()
  f1.write(",")
  f1.write(json.dumps(fake_json))

f1.write("]")
f1.close()

Ingesting as plain parquet data using spark

./spark-shell --driver-memory 5g


import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Write JSON to Parquet").getOrCreate()
val file_prefix = "ten-columns"
val json_df = spark.read.json("/tmp/hudi-metafields-benchmark/%s-input-data.json".format(file_prefix))
json_df.count()
json_df.show()
# repartition to 1 before writing

json_df.repartition(1).write.parquet("/tmp/hudi-metafields-benchmark/spark-parquet/%s".format(file_prefix))

Ingesting as Non-partitioned HUDI COW table using Bulk Insert and Autogen key.

NOTE: if you are testing UPSERT operation, a pre-combine field is needed since we don't have a ts field in the input data. Choose a column with a timestamp or number field for this.

./spark-shell --driver-memory 5g \
--jars ~/hudi/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.14.0-SNAPSHOT.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.SaveMode._
val file_prefix = "ten-columns"
val tableName = "hudi_%s".format(file_prefix)
val basePath = "file:///tmp/hudi-metafields-benchmark/hudi/%s".format(tableName)
val inputPath = "file:///tmp/hudi-metafields-benchmark/%s-input-data.json".format(file_prefix)
val json_df = spark.read.json("/tmp/hudi-metafields-benchmark/%s-input-data.json".format(file_prefix))
json_df.count()
json_df.show()
json_df.write.format("hudi").option(PRECOMBINE_FIELD_OPT_KEY, "column6").option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).option(TABLE_NAME, tableName).mode(Overwrite).save(basePath)

Findings

Short summary for meta fields storage size and compression ratio

total fields: 10 (33% are meta fields) compressed size (including meta fields in case of Hudi) total fields: 30 (16.7% are meta fields) compressed size (including meta fields in case of Hudi) total fields: 100 (5% are meta fields) compressed size (including meta fields in case of Hudi)
vanilla parquet (snappy) 118.045 MB 286.787 MB 332.289 MB
Hudi COW Bulk Insert with default compression(GZIP) 83.510 MB (Avg 11x compression ratio for record key meta field) 216.141 MB (Avg 11x compression ratio for record key meta field) 251.192 MB (Avg 11x compression ratio for record key meta field)
Hudi COW Bulk Insert with snappy 127.781 MB (Avg 5x compression ratio for record key meta field)(meta fields overhead: 9.094%) 287.875 MB (Avg 5x compression ratio for record key meta field)(meta fields overhead: 0.379 %) 332.34 MB (Avg 5x compression ratio for record key meta field) (meta fields overhead: 0.0153%)
Cloud costs assume 0.023$ per GB per month. Assume a 100 TB table size. ((100 TB * 9.094%) * 0.023 per GB for 1 month) = 214.18$ ((100 TB * 0.379%) * 0.023 per GB for 1 month) = 8.92$ ((100 TB * 0.0153%) * 0.023 per GB for 1 month) = 0.36$

More details:

Parquet Hudi table (snappy compression set explicitly) Hudi table (GZIP compressed default)
# columns: 10
input JSON data size: 368 M / 1000001 Records
size uncompressed/compressed 224.572 MB / 118.045 MB 281.811 MB / 127.781 MB 281.590 MB / 83.510 MB
rowcount 1000001 1000001 1000001
raw compression ratio for meta fields Meta Field: SZ:compressed-size-in-bytes/uncompressed-size-in-bytes/compression-ratio

ROW GROUP 1 (929561 records):
_hoodie_commit_time: SZ:1874/1778/0.95
_hoodie_commit_seqno: SZ:4663473/27777464/5.96
_hoodie_record_key: SZ:4663467/27777459/5.96
_hoodie_partition_path: SZ:1858/1762/0.95
_hoodie_file_name: SZ:2903/2756/0.95

ROW GROUP 2 (70440 records):
_hoodie_commit_time: SZ:197/187/0.95
_hoodie_commit_seqno: SZ:352761/2113350/5.99
_hoodie_record_key: SZ:352790/2113349/5.99
_hoodie_partition_path: SZ:181/171/0.94
_hoodie_file_name: SZ:329/314/0.95
Meta Field: SZ:compressed-size-in-bytes/uncompressed-size-in-bytes/compression-ratio

_hoodie_commit_time: SZ:2859/1921/0.67
_hoodie_commit_seqno: SZ:2600343/29890804/11.49
_hoodie_record_key: SZ:2599858/29890796/11.50 _hoodie_partition_path: SZ:2844/1904/0.67
_hoodie_file_name: SZ:4381/2972/0.68
# columns: 30
input JSON data size: 350M / 100001 Records
size uncompressed/compressed 311.281 MB / 286.787 MB 317.385 MB / 287.875 MB
(143.016 + 142.967 + 31.402 /
129.760 + 129.692 + 28.423 )

316.666 MB / 216.141 MB
(188.323 + 128.343 /
128.513 + 87.628 )
raw compression ratio for meta fields Meta Field: SZ:compressed-size-in-bytes/uncompressed-size-in-bytes/compression-ratio

ROW GROUP 1 (39603 records):
_hoodie_commit_time: SZ:120/114/0.95
_hoodie_commit_seqno: SZ:198878/1227767/6.17
_hoodie_record_key: SZ:198775/1137451/5.72
_hoodie_partition_path: SZ:103/97/0.94
_hoodie_file_name: SZ:252/241/0.96

ROW GROUP 2 (5520 records):
_hoodie_commit_time: SZ:78/74/0.95
_hoodie_commit_seqno: SZ:27665/171155/6.19
_hoodie_record_key: SZ:27585/160115/5.80
_hoodie_partition_path: SZ:61/57/0.93
_hoodie_file_name: SZ:135/130/0.96

ROW GROUP 1 (39532 records):
_hoodie_commit_time: SZ:120/114/0.95
_hoodie_commit_seqno: SZ:198031/1225566/6.19
_hoodie_record_key: SZ:197191/1146501/5.81
_hoodie_partition_path: SZ:103/97/0.94
_hoodie_file_name: SZ:252/241/0.96

ROW GROUP 2 (5579 records):
_hoodie_commit_time: SZ:78/74/0.95
_hoodie_commit_seqno: SZ:27946/172984/6.19
_hoodie_record_key: SZ:27981/161826/5.78
_hoodie_partition_path: SZ:61/57/0.93
_hoodie_file_name: SZ:135/130/0.96

ROW GROUP 1 (39532 records):
_hoodie_commit_time: SZ:81/77/0.95
_hoodie_commit_seqno: SZ:49257/302814/6.15
_hoodie_record_key: SZ:49195/283281/5.76
_hoodie_partition_path: SZ:64/60/0.94
_hoodie_file_name: SZ:138/133/0.96
Meta Field: SZ:compressed-size-in-bytes/uncompressed-size-in-bytes/compression-ratio

ROW GROUP 1 (52995 records):
_hoodie_commit_time: SZ:224/151/0.67
_hoodie_commit_seqno: SZ:137724/1525860/11.08
_hoodie_record_key: SZ:137703/1525855/11.08
_hoodie_partition_path: SZ:207/134/0.65
_hoodie_file_name: SZ:377/278/0.74

ROW GROUP 2 (6529 records):
_hoodie_commit_time: SZ:112/74/0.66
_hoodie_commit_seqno: SZ:16934/189376/11.18
_hoodie_record_key: SZ:16936/189376/11.18
_hoodie_partition_path: SZ:95/57/0.60
_hoodie_file_name: SZ:157/130/0.83

ROW GROUP 1 (40477 records):
_hoodie_commit_time: SZ:222/148/0.67
_hoodie_commit_seqno: SZ:105401/1173942/11.14
_hoodie_record_key: SZ:105405/1173942/11.14
_hoodie_partition_path: SZ:205/131/0.64
_hoodie_file_name: SZ:323/241/0.75
# columns: 100
input JSON data size: 398M / 30001 Records
size uncompressed/compressed 360.079 MB / 332.289 MB 362.011 MB / 332.34 MB
(141.661 + 141.661 + 78.689) / (130.084 +130.125 +72.221)
361.702 MB / 251.192 MB
(182.857 + 178.845) / (126.966 + 124.226)
raw compression ratio for meta fields Meta Field: SZ:compressed-size-in-bytes/uncompressed-size-in-bytes/compression-ratio

ROW GROUP 1(9773 records):
_hoodie_commit_time: SZ:81/77/0.95
_hoodie_commit_seqno: SZ:49238/303000/6.15
_hoodie_record_key: SZ:49513/272571/5.51
_hoodie_partition_path: SZ:64/60/0.94
_hoodie_file_name: SZ:139/134/0.96

ROW GROUP 2 (2004 records):
_hoodie_commit_time: SZ:78/74/0.95
_hoodie_commit_seqno: SZ:9840/62159/6.32
_hoodie_record_key: SZ:10097/57924/5.74
_hoodie_partition_path: SZ:61/57/0.93
_hoodie_file_name: SZ:136/131/0.96

ROW GROUP 1 (9728 records):
_hoodie_commit_time: SZ:81/77/0.95
_hoodie_commit_seqno: SZ:48866/301605/6.17
_hoodie_record_key: SZ:48647/282149/5.80
_hoodie_partition_path: SZ:64/60/0.94
_hoodie_file_name: SZ:139/134/0.96

ROW GROUP 2 (1989 records):
_hoodie_commit_time: SZ:78/74/0.95
_hoodie_commit_seqno: SZ:10106/61694/6.10
_hoodie_record_key: SZ:10095/57716/5.72
_hoodie_partition_path: SZ:61/57/0.93
_hoodie_file_name: SZ:136/131/0.96

ROW GROUP 1 (6507 records):
_hoodie_commit_time: SZ:78/74/0.95
_hoodie_commit_seqno: SZ:32851/201752/6.14
_hoodie_record_key: SZ:32708/188738/5.77
_hoodie_partition_path: SZ:61/57/0.93
_hoodie_file_name: SZ:136/131/0.96
Meta Field: SZ:compressed-size-in-bytes/uncompressed-size-in-bytes/compression-ratio

ROW GROUP 1 (12083 records):
_hoodie_commit_time: SZ:114/76/0.67
_hoodie_commit_seqno: SZ:31336/339338/10.83
_hoodie_record_key: SZ:31336/339334/10.83
_hoodie_partition_path: SZ:97/59/0.61
_hoodie_file_name: SZ:159/132/0.83

ROW GROUP 2 (3111 records):
_hoodie_commit_time: SZ:112/74/0.66
_hoodie_commit_seqno: SZ:7839/90253/11.51
_hoodie_record_key: SZ:7839/90253/11.51
_hoodie_partition_path: SZ:95/57/0.60
_hoodie_file_name: SZ:157/130/0.83

ROW GROUP 1 (12019 records):
_hoodie_commit_time: SZ:115/77/0.67
_hoodie_commit_seqno: SZ:31032/348588/11.23
_hoodie_record_key: SZ:31033/348588/11.23
_hoodie_partition_path: SZ:98/60/0.61
_hoodie_file_name: SZ:159/133/0.84

ROW GROUP 2 (2788 records):
_hoodie_commit_time: SZ:112/74/0.66
_hoodie_commit_seqno: SZ:6984/80886/11.58
_hoodie_record_key: SZ:6983/80886/11.58
_hoodie_partition_path: SZ:95/57/0.60
_hoodie_file_name: SZ:156/130/0.83
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment