Skip to content

Instantly share code, notes, and snippets.

@rnbtechnology
rnbtechnology / sample_delete_record.json
Created February 18, 2021 00:54
GoldenGate Sample Records
{
"table": "GG.TCUSTORD",
"op_type": "D",
"op_ts": "2013-06-02 22:14:41.000000",
"current_ts": "2015-09-18T10:17:49.899000",
"pos": "00000000000000004338",
"primary_keys": [
"CUST_CODE",
"ORDER_DATE",
"PRODUCT_CODE",
<include>org.apache.httpcomponents:httpclient</include>
<relocation>
<pattern>org.eclipse.jetty.</pattern>
<shadedPattern>org.apache.hudi.org.eclipse.jetty.</shadedPattern>
</relocation>
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
import copy
# write to a path using the Hudi format
def hudi_write(df, schema, table, path, mode, hudi_options):
hudi_options = {
"hoodie.datasource.write.recordkey.field": "recordkey",
"hoodie.datasource.write.precombine.field": "precombine_field",
"hoodie.datasource.write.partitionpath.field": "partitionpath_field",
"hoodie.datasource.write.operation": "write_operaion",
"hoodie.datasource.write.table.type": "table_type",
from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.functions import col
# instantiates a Scala Map containing configurations for communicating with Schema Regsitry APIs
def get_schema_registry_conf_map(spark, schema_registry_url, topic_name):
sc = spark.SparkContext
jvm_gateway = sc._gateway.jvm
schema_registry_config_dict = {
TOPIC_NAME = "topic_name"
KAFKA_BOOTSTRAP_SERVERS = "host1:port1,host2:port2"
# read data from Kafka
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
.option("subscribe", TOPIC_NAME)
.load()
)
{
"table": "GG.TCUSTORD",
"op_type": "D",
"op_ts": "2013-06-02 22:14:41.000000",
"current_ts": "2015-09-18T10:17:49.899000",
"pos": "00000000000000004338",
"primary_keys": [
"CUST_CODE",
"ORDER_DATE",
"PRODUCT_CODE",
{
"table": "GG.TCUSTORD",
"op_type": "U",
"op_ts": "2013-06-02 22:14:41.000000",
"current_ts": "2015-09-18T10:17:49.880000",
"pos": "00000000000000002891",
"primary_keys": [
"CUST_CODE",
"ORDER_DATE",
"PRODUCT_CODE",
{
"table": "GG.TCUSTORD",
"op_type": "I",
"op_ts": "2013-06-02 22:14:36.000000",
"current_ts": "2015-09-18T10:17:49.570000",
"pos": "00000000000000001444",
"primary_keys": [
"CUST_CODE",
"ORDER_DATE",
"PRODUCT_CODE",