Skip to content

Instantly share code, notes, and snippets.

View garystafford's full-sized avatar
💭
Happily Coding!

Gary A. Stafford garystafford

💭
Happily Coding!
View GitHub Profile
UPDATE ecomm.sale s
SET s.buyerid = 11694
s.qtysold = 3,
s.pricepaid = 600.00,
s.commission = 90.00
WHERE s.salesid = 200;
DATA_LAKE_BUCKET="<your_data_lake_s3_bucket>"
TARGET_TABLE="tickit.ecomm.sale"
spark-submit \
--name %{TARGET_TABLE} \
--jars /usr/lib/spark/jars/spark-avro.jar,/usr/lib/hudi/hudi-utilities-bundle.jar \
--conf spark.sql.catalogImplementation=hive \
--conf spark.yarn.submit.waitAppCompletion=false \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /usr/lib/hudi/hudi-utilities-bundle.jar` \
--props file://${PWD}/${TARGET_TABLE}.properties \
include=base.properties
hoodie.datasource.hive_sync.partition_fields=__table
hoodie.datasource.hive_sync.table=sale
hoodie.datasource.write.partitionpath.field=__table
hoodie.datasource.write.recordkey.field=salesid
hoodie.deltastreamer.schemaprovider.registry.url=http://http://<your_registry_host>:<port>/apis/ccompat/v6/subjects/tickit.ecomm.sale-value/versions/latest
hoodie.deltastreamer.source.dfs.root=s3://<your_data_lake_s3_bucket>/cdc_hudi_data_lake/bronze/tickit.ecomm.sale/
hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.delete.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.hive_sync.mode=hms
hoodie.datasource.hive_sync.use_jdbc=false
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.database=tickit_cdc_hudi
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
{
"_hoodie_commit_time": "20230226135257126",
"_hoodie_commit_seqno": "20230226135257126_0_69229",
"_hoodie_record_key": "1",
"_hoodie_partition_path": "__table=sale",
"_hoodie_file_name": "773bbbb7-b6ec-4ade-9f08-942726584e42-0_0-35-50_20230226135257126.parquet",
"salesid": 1,
"listid": 1,
"sellerid": 36861,
"buyerid": 21191,
{
"before" : null,
"after" : {
"full.ecomm.sale.Value" : {
"salesid" : 1,
"listid" : 1,
"sellerid" : 36861,
"buyerid" : 21191,
"eventid" : 7872,
"dateid" : 1875,
{
"salesid" : 1,
"listid" : 1,
"sellerid" : 36861,
"buyerid" : 21191,
"eventid" : 7872,
"dateid" : 1875,
"qtysold" : 4,
"pricepaid" : 728.0,
"commission" : 109.2,
{
"name": "sink_connector_kafka_s3_avro_tickit",
"config": {
"behavior.on.null.values": "ignore",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"flush.size": 10000,
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "true",
{
"name": "source_connector_mysql_kafka_avro_tickit",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "${secretManager:demo/mysql:host}",
"database.include.list": "ecomm",
"database.password": "${secretManager:demo/mysql:password}",
"database.port": 3306,
"database.server.id": 184054,
"database.user": "${secretManager:demo/mysql:username}",
{
"name": "source_connector_mssql_kafka_avro_tickit",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.encrypt": "false",
"database.hostname": "${secretManager:demo/mssql:host}",
"database.names": "tickit",
"database.password": "${secretManager:demo/mssql:password}",
"database.port": 1433,
"database.user": "${secretManager:demo/mssql:username}",