Skip to content

Instantly share code, notes, and snippets.

@rnbtechnology
Last active November 8, 2021 08:39
Show Gist options
  • Save rnbtechnology/e960170c643136595f2c6aa6060e348e to your computer and use it in GitHub Desktop.
Save rnbtechnology/e960170c643136595f2c6aa6060e348e to your computer and use it in GitHub Desktop.
import copy
# write to a path using the Hudi format
def hudi_write(df, schema, table, path, mode, hudi_options):
hudi_options = {
"hoodie.datasource.write.recordkey.field": "recordkey",
"hoodie.datasource.write.precombine.field": "precombine_field",
"hoodie.datasource.write.partitionpath.field": "partitionpath_field",
"hoodie.datasource.write.operation": "write_operaion",
"hoodie.datasource.write.table.type": "table_type",
"hoodie.table.name": TABLE,
"hoodie.datasource.write.table.name": TABLE,
"hoodie.bloom.index.update.partition.path": True,
"hoodie.index.type": "GLOBAL_BLOOM",
"hoodie.consistency.check.enabled": True,
# Set Glue Data Catalog related Hudi configs
"hoodie.datasource.hive_sync.enable": True,
"hoodie.datasource.hive_sync.use_jdbc": False,
"hoodie.datasource.hive_sync.database": SCHEMA,
"hoodie.datasource.hive_sync.table": TABLE,
}
if (
hudi_options.get("hoodie.datasource.write.partitionpath.field")
and hudi_options.get("hoodie.datasource.write.partitionpath.field") != ""
):
hudi_options.setdefault(
"hoodie.datasource.write.keygenerator.class",
"org.apache.hudi.keygen.ComplexKeyGenerator",
)
hudi_options.setdefault(
"hoodie.datasource.hive_sync.partition_extractor_class",
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
)
hudi_options.setdefault(
"hoodie.datasource.hive_sync.partition_fields",
hudi_options.get("hoodie.datasource.write.partitionpath.field"),
)
hudi_options.setdefault("hoodie.datasource.write.hive_style_partitioning", True)
else:
hudi_options[
"hoodie.datasource.write.keygenerator.class"
] = "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
hudi_options.setdefault(
"hoodie.datasource.hive_sync.partition_extractor_class",
"org.apache.hudi.hive.NonPartitionedExtractor",
)
df.write.format("hudi").options(**hudi_options).mode(mode).save(path)
# parse the OGG records and write upserts/deletes to S3 by calling the hudi_write function
def write_to_s3(df, path):
# select the pertitent fields from the df
flattened_df = df.select(
"value.*", "key", "partition", "offset", "timestamp", "timestampType"
)
# filter for only the inserts and updates
df_w_upserts = flattened_df.filter('op_type in ("I", "U")').select(
"after.*",
"key",
"partition",
"offset",
"timestamp",
"timestampType",
"op_type",
"op_ts",
"current_ts",
"pos",
)
# filter for only the deletes
df_w_deletes = flattened_df.filter('op_type in ("D")').select(
"before.*",
"key",
"partition",
"offset",
"timestamp",
"timestampType",
"op_type",
"op_ts",
"current_ts",
"pos",
)
# invoke hudi_write function for upserts
if df_w_upserts and df_w_upserts.count() > 0:
hudi_write(
df=df_w_upserts,
schema="schema_name",
table="table_name",
path=path,
mode="append",
hudi_options=hudi_options
)
# invoke hudi_write function for deletes
if df_w_deletes and df_w_deletes.count() > 0:
hudi_options_copy = copy.deepcopy(hudi_options)
hudi_options_copy["hoodie.datasource.write.operation"] = "delete"
hudi_options_copy["hoodie.bloom.index.update.partition.path"] = False
hudi_write(
df=df_w_deletes,
schema="schema_name",
table="table_name",
path=path,
mode="append",
hudi_options=hudi_options_copy
)
TABLE = "table_name"
SCHEMA = "schema_name"
CHECKPOINT_LOCATION = "s3://bucket/checkpoint_path/"
TARGET_PATH="s3://bucket/target_path/"
STREAMING = True
# instantiate writeStream object
query = deserialized_df.writeStream
# add attribute to writeStream object for batch writes
if not STREAMING:
query = query.trigger(once=True)
# write to a path using the Hudi format
write_to_s3_hudi = query.foreachBatch(
lambda batch_df, batch_id: write_to_s3(df=batch_df, path=TARGET_PATH)
).start(checkpointLocation=CHECKPOINT_LOCATION)
# await termination of the write operation
write_to_s3_hudi.awaitTermination()
@absognety
Copy link

Thanks for sharing this, this helps a lot!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment