The idea is to analyze the cost of the metafields that Hudi stores at record level.
Choose narrow to wide tables with varying columns ➝. 10, 30, 100, 1000 columns etc. Use Auto keygen with bulk_insert operation for Hudi. Generate vanilla parquet data via spark and non-partitioned HUDI COW table for comparison. For spark we can reduce the partition to 1 to compare it to non-partitioned Hudi table. We will assume the input json data size is roughly the same for all three tables. Here we take ~ 350MB input json file size.
I used chat gpt to generate a random json schema with # of columns that have primitive data types and built-in formats. One such schema for a 10-column table looks like below. Built-in formats in Json allow for more realistic data. The columns' names are boring though.
/tmp/hudi-metafields-benchmark/ten-columns-schema.json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "My Data Schema",
"type": "object",
"properties": {
"column1": {
"type": "integer"
},
"column2": {
"type": "string"
},
"column3": {
"type": "boolean"
},
"column4": {
"type": "number"
},
"column5": {
"type": "integer"
},
"column6": {
"type": "string",
"format": "date-time"
},
"column7": {
"type": "string",
"format": "email"
},
"column8": {
"type": "string",
"format": "uri"
},
"column9": {
"type": "string",
"maxLength": 50
},
"column10": {
"type": "string",
"pattern": "^[A-Za-z]+$"
}
},
"required": [
"column1",
"column2",
"column3",
"column4",
"column5",
"column6",
"column7",
"column8",
"column9",
"column10"
]
}
There are plenty of data generators available to use. I found Json Schema Faker simple to work with in Python.
Install jsf using
pip install jsf
Generate sample data using the code snippet below.
from jsf import JSF
import json
file_prefix='ten-columns' # Change this to reflect file name everywhere.
faker = JSF.from_json('/tmp/hudi-metafields-benchmark/{}-schema.json'.format(file_prefix))
f1 = open('/tmp/hudi-metafields-benchmark/{}-input-data.json'.format(file_prefix), "a")
f1.write("[")
fake_json = faker.generate()
f1.write(json.dumps(fake_json))
# Generate 1M records based on the schema above.
# You will want to change this to produce ~350MB file
for x in range(1000000):
fake_json = faker.generate()
f1.write(",")
f1.write(json.dumps(fake_json))
f1.write("]")
f1.close()
./spark-shell --driver-memory 5g
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Write JSON to Parquet").getOrCreate()
val file_prefix = "ten-columns"
val json_df = spark.read.json("/tmp/hudi-metafields-benchmark/%s-input-data.json".format(file_prefix))
json_df.count()
json_df.show()
# repartition to 1 before writing
json_df.repartition(1).write.parquet("/tmp/hudi-metafields-benchmark/spark-parquet/%s".format(file_prefix))
NOTE: if you are testing UPSERT operation, a pre-combine field is needed since we don't have a ts field in the input data. Choose a column with a timestamp or number field for this.
./spark-shell --driver-memory 5g \
--jars ~/hudi/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.14.0-SNAPSHOT.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.SaveMode._
val file_prefix = "ten-columns"
val tableName = "hudi_%s".format(file_prefix)
val basePath = "file:///tmp/hudi-metafields-benchmark/hudi/%s".format(tableName)
val inputPath = "file:///tmp/hudi-metafields-benchmark/%s-input-data.json".format(file_prefix)
val json_df = spark.read.json("/tmp/hudi-metafields-benchmark/%s-input-data.json".format(file_prefix))
json_df.count()
json_df.show()
json_df.write.format("hudi").option(PRECOMBINE_FIELD_OPT_KEY, "column6").option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).option(TABLE_NAME, tableName).mode(Overwrite).save(basePath)
Short summary for meta fields storage size and compression ratio
total fields: 10 (33% are meta fields) compressed size (including meta fields in case of Hudi) | total fields: 30 (16.7% are meta fields) compressed size (including meta fields in case of Hudi) | total fields: 100 (5% are meta fields) compressed size (including meta fields in case of Hudi) | |
---|---|---|---|
vanilla parquet (snappy) | 118.045 MB | 286.787 MB | 332.289 MB |
Hudi COW Bulk Insert with default compression(GZIP) | 83.510 MB (Avg 11x compression ratio for record key meta field) | 216.141 MB (Avg 11x compression ratio for record key meta field) | 251.192 MB (Avg 11x compression ratio for record key meta field) |
Hudi COW Bulk Insert with snappy | 127.781 MB (Avg 5x compression ratio for record key meta field)(meta fields overhead: 9.094%) | 287.875 MB (Avg 5x compression ratio for record key meta field)(meta fields overhead: 0.379 %) | 332.34 MB (Avg 5x compression ratio for record key meta field) (meta fields overhead: 0.0153%) |
Cloud costs assume 0.023$ per GB per month. Assume a 100 TB table size. | ((100 TB * 9.094%) * 0.023 per GB for 1 month) = 214.18$ | ((100 TB * 0.379%) * 0.023 per GB for 1 month) = 8.92$ | ((100 TB * 0.0153%) * 0.023 per GB for 1 month) = 0.36$ |
Parquet | Hudi table (snappy compression set explicitly) | Hudi table (GZIP compressed default) | |
---|---|---|---|
# columns: 10 | |||
input JSON data size: 368 M / 1000001 Records | |||
size uncompressed/compressed | 224.572 MB / 118.045 MB | 281.811 MB / 127.781 MB | 281.590 MB / 83.510 MB |
rowcount | 1000001 | 1000001 | 1000001 |
raw compression ratio for meta fields | Meta Field: SZ:compressed-size-in-bytes/uncompressed-size-in-bytes/compression-ratio ROW GROUP 1 (929561 records): _hoodie_commit_time: SZ:1874/1778/0.95 _hoodie_commit_seqno: SZ:4663473/27777464/5.96 _hoodie_record_key: SZ:4663467/27777459/5.96 _hoodie_partition_path: SZ:1858/1762/0.95 _hoodie_file_name: SZ:2903/2756/0.95 ROW GROUP 2 (70440 records): _hoodie_commit_time: SZ:197/187/0.95 _hoodie_commit_seqno: SZ:352761/2113350/5.99 _hoodie_record_key: SZ:352790/2113349/5.99 _hoodie_partition_path: SZ:181/171/0.94 _hoodie_file_name: SZ:329/314/0.95 |
Meta Field: SZ:compressed-size-in-bytes/uncompressed-size-in-bytes/compression-ratio _hoodie_commit_time: SZ:2859/1921/0.67 _hoodie_commit_seqno: SZ:2600343/29890804/11.49 _hoodie_record_key: SZ:2599858/29890796/11.50 _hoodie_partition_path: SZ:2844/1904/0.67 _hoodie_file_name: SZ:4381/2972/0.68 |
|
# columns: 30 | |||
input JSON data size: 350M / 100001 Records | |||
size uncompressed/compressed | 311.281 MB / 286.787 MB | 317.385 MB / 287.875 MB (143.016 + 142.967 + 31.402 / 129.760 + 129.692 + 28.423 ) |
316.666 MB / 216.141 MB (188.323 + 128.343 / 128.513 + 87.628 ) |
raw compression ratio for meta fields | Meta Field: SZ:compressed-size-in-bytes/uncompressed-size-in-bytes/compression-ratio ROW GROUP 1 (39603 records): _hoodie_commit_time: SZ:120/114/0.95 _hoodie_commit_seqno: SZ:198878/1227767/6.17 _hoodie_record_key: SZ:198775/1137451/5.72 _hoodie_partition_path: SZ:103/97/0.94 _hoodie_file_name: SZ:252/241/0.96 ROW GROUP 2 (5520 records): _hoodie_commit_time: SZ:78/74/0.95 _hoodie_commit_seqno: SZ:27665/171155/6.19 _hoodie_record_key: SZ:27585/160115/5.80 _hoodie_partition_path: SZ:61/57/0.93 _hoodie_file_name: SZ:135/130/0.96 ROW GROUP 1 (39532 records): _hoodie_commit_time: SZ:120/114/0.95 _hoodie_commit_seqno: SZ:198031/1225566/6.19 _hoodie_record_key: SZ:197191/1146501/5.81 _hoodie_partition_path: SZ:103/97/0.94 _hoodie_file_name: SZ:252/241/0.96 ROW GROUP 2 (5579 records): _hoodie_commit_time: SZ:78/74/0.95 _hoodie_commit_seqno: SZ:27946/172984/6.19 _hoodie_record_key: SZ:27981/161826/5.78 _hoodie_partition_path: SZ:61/57/0.93 _hoodie_file_name: SZ:135/130/0.96 ROW GROUP 1 (39532 records): _hoodie_commit_time: SZ:81/77/0.95 _hoodie_commit_seqno: SZ:49257/302814/6.15 _hoodie_record_key: SZ:49195/283281/5.76 _hoodie_partition_path: SZ:64/60/0.94 _hoodie_file_name: SZ:138/133/0.96 |
Meta Field: SZ:compressed-size-in-bytes/uncompressed-size-in-bytes/compression-ratio ROW GROUP 1 (52995 records): _hoodie_commit_time: SZ:224/151/0.67 _hoodie_commit_seqno: SZ:137724/1525860/11.08 _hoodie_record_key: SZ:137703/1525855/11.08 _hoodie_partition_path: SZ:207/134/0.65 _hoodie_file_name: SZ:377/278/0.74 ROW GROUP 2 (6529 records): _hoodie_commit_time: SZ:112/74/0.66 _hoodie_commit_seqno: SZ:16934/189376/11.18 _hoodie_record_key: SZ:16936/189376/11.18 _hoodie_partition_path: SZ:95/57/0.60 _hoodie_file_name: SZ:157/130/0.83 ROW GROUP 1 (40477 records): _hoodie_commit_time: SZ:222/148/0.67 _hoodie_commit_seqno: SZ:105401/1173942/11.14 _hoodie_record_key: SZ:105405/1173942/11.14 _hoodie_partition_path: SZ:205/131/0.64 _hoodie_file_name: SZ:323/241/0.75 |
|
# columns: 100 | |||
input JSON data size: 398M / 30001 Records | |||
size uncompressed/compressed | 360.079 MB / 332.289 MB | 362.011 MB / 332.34 MB (141.661 + 141.661 + 78.689) / (130.084 +130.125 +72.221) |
361.702 MB / 251.192 MB (182.857 + 178.845) / (126.966 + 124.226) |
raw compression ratio for meta fields | Meta Field: SZ:compressed-size-in-bytes/uncompressed-size-in-bytes/compression-ratio ROW GROUP 1(9773 records): _hoodie_commit_time: SZ:81/77/0.95 _hoodie_commit_seqno: SZ:49238/303000/6.15 _hoodie_record_key: SZ:49513/272571/5.51 _hoodie_partition_path: SZ:64/60/0.94 _hoodie_file_name: SZ:139/134/0.96 ROW GROUP 2 (2004 records): _hoodie_commit_time: SZ:78/74/0.95 _hoodie_commit_seqno: SZ:9840/62159/6.32 _hoodie_record_key: SZ:10097/57924/5.74 _hoodie_partition_path: SZ:61/57/0.93 _hoodie_file_name: SZ:136/131/0.96 ROW GROUP 1 (9728 records): _hoodie_commit_time: SZ:81/77/0.95 _hoodie_commit_seqno: SZ:48866/301605/6.17 _hoodie_record_key: SZ:48647/282149/5.80 _hoodie_partition_path: SZ:64/60/0.94 _hoodie_file_name: SZ:139/134/0.96 ROW GROUP 2 (1989 records): _hoodie_commit_time: SZ:78/74/0.95 _hoodie_commit_seqno: SZ:10106/61694/6.10 _hoodie_record_key: SZ:10095/57716/5.72 _hoodie_partition_path: SZ:61/57/0.93 _hoodie_file_name: SZ:136/131/0.96 ROW GROUP 1 (6507 records): _hoodie_commit_time: SZ:78/74/0.95 _hoodie_commit_seqno: SZ:32851/201752/6.14 _hoodie_record_key: SZ:32708/188738/5.77 _hoodie_partition_path: SZ:61/57/0.93 _hoodie_file_name: SZ:136/131/0.96 |
Meta Field: SZ:compressed-size-in-bytes/uncompressed-size-in-bytes/compression-ratio ROW GROUP 1 (12083 records): _hoodie_commit_time: SZ:114/76/0.67 _hoodie_commit_seqno: SZ:31336/339338/10.83 _hoodie_record_key: SZ:31336/339334/10.83 _hoodie_partition_path: SZ:97/59/0.61 _hoodie_file_name: SZ:159/132/0.83 ROW GROUP 2 (3111 records): _hoodie_commit_time: SZ:112/74/0.66 _hoodie_commit_seqno: SZ:7839/90253/11.51 _hoodie_record_key: SZ:7839/90253/11.51 _hoodie_partition_path: SZ:95/57/0.60 _hoodie_file_name: SZ:157/130/0.83 ROW GROUP 1 (12019 records): _hoodie_commit_time: SZ:115/77/0.67 _hoodie_commit_seqno: SZ:31032/348588/11.23 _hoodie_record_key: SZ:31033/348588/11.23 _hoodie_partition_path: SZ:98/60/0.61 _hoodie_file_name: SZ:159/133/0.84 ROW GROUP 2 (2788 records): _hoodie_commit_time: SZ:112/74/0.66 _hoodie_commit_seqno: SZ:6984/80886/11.58 _hoodie_record_key: SZ:6983/80886/11.58 _hoodie_partition_path: SZ:95/57/0.60 _hoodie_file_name: SZ:156/130/0.83 |