Skip to content

Instantly share code, notes, and snippets.

@bwu2
Created February 14, 2020 20:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bwu2/e432a42f51519f27197f4785af3e1abf to your computer and use it in GitHub Desktop.
Save bwu2/e432a42f51519f27197f4785af3e1abf to your computer and use it in GitHub Desktop.
simple Hudi slow example
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
config = {
"table_name": "example_table",
"target": "s3://example-bucket/example_table/",
"primary_key": "id",
"sort_key": "id",
}
#spark.master yarn
#spark.submit.deployMode client
#spark.executor.memory 9486M
#spark.driver.memory 2048M
#spark.executor.cores 4
def get_json_data(start, count):
data = [{"id": i} for i in range(start, start + count)]
return data
def create_json_df(spark, data):
sc = spark.sparkContext
return spark.read.json(sc.parallelize(data, 2))
def main(spark):
df1 = create_json_df(spark, get_json_data(0, 4000000))
print(f'{df1.count()} records in source 1')
# Runs quick
(df1.write.format(HUDI_FORMAT)
.option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
.option(BULK_INSERT_PARALLELISM, 1)
.mode("Overwrite")
.save(config['target']))
print(f'{spark.read.format(HUDI_FORMAT).load(config["target"]+"/*").count()} records in Hudi table')
# Runs quick
(df1.limit(3000000).write.format(HUDI_FORMAT)
.option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option(UPSERT_PARALLELISM, 20)
.mode("Append")
.save(config['target']))
print(f'{spark.read.format(HUDI_FORMAT).load(config["target"]+"/*").count()} records in Hudi table')
# Runs very slow
(df1.write.format(HUDI_FORMAT)
.option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option(UPSERT_PARALLELISM, 20)
.mode("Append")
.save(config['target']))
print(f'{spark.read.format(HUDI_FORMAT).load(config["target"]+"/*").count()} records in Hudi table')
if __name__ == "__main__":
main(spark)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment