Skip to content

Instantly share code, notes, and snippets.

@bwu2
Last active February 12, 2020 20:45
Show Gist options
  • Save bwu2/89f98e0926374f71c80e4b2fa5089f18 to your computer and use it in GitHub Desktop.
Save bwu2/89f98e0926374f71c80e4b2fa5089f18 to your computer and use it in GitHub Desktop.
Hudi upsert hangs
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
config = {
"table_name": "example_table",
"target": "s3://example_bucket/example_table/",
"primary_key": "id",
"sort_key": "sk",
}
def get_json_data(start, count):
data = [{"id": i, "sk": i, "txt": chr(65 + (i % 26))} for i in range(start, start + count)]
return data
def create_json_df(spark, data):
sc = spark.sparkContext
return spark.read.json(sc.parallelize(data, 2))
def main(spark):
df1 = create_json_df(spark, get_json_data(0, 4000000))
print(f'{df1.count()} records in source 1')
(df1.write.format(HUDI_FORMAT)
.option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
.option(BULK_INSERT_PARALLELISM, 1)
.mode("Overwrite")
.save(config['target']))
print(f'{spark.read.format(HUDI_FORMAT).load(config["target"]+"/*").count()} records in Hudi table')
df2 = create_json_df(spark, get_json_data(500000, 4000000))
print(f'{df2.count()} records in source 2')
(df2.write.format(HUDI_FORMAT)
.option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option(UPSERT_PARALLELISM, 20)
.mode("Append")
.save(config['target']))
print(f'{spark.read.format(HUDI_FORMAT).load(config["target"]+"/*").count()} records in Hudi table')
if __name__ == "__main__":
main(spark)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment