Skip to content

Instantly share code, notes, and snippets.

@ad1happy2go
Created May 17, 2023 15:12
Show Gist options
  • Save ad1happy2go/9bb90a3b8f3f494ddb65f957b7bf2447 to your computer and use it in GitHub Desktop.
Save ad1happy2go/9bb90a3b8f3f494ddb65f957b7bf2447 to your computer and use it in GitHub Desktop.
from pyspark.sql.types import *
from pyspark.sql.functions import *
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import Row
from datetime import date
spark = SparkSession \
.builder \
.master("local[1]") \
.config("spark.driver.memory", "8g") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.0") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
common_config={
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.recordkey.field": "profile_id",
"hoodie.datasource.write.precombine.field": "timestamp",
"hoodie.datasource.write.partitionpath.field": "timestamp__date_",
'hoodie.table.name': 'issue_8625',
'hoodie.index.type': 'GLOBAL_BLOOM',
'hoodie.bloom.index.update.partition.path': 'false',
"hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.TimestampBasedKeyGenerator",
"hoodie.deltastreamer.keygen.timebased.timestamp.type": "SCALAR",
"hoodie.deltastreamer.keygen.timebased.output.dateformat": 'yyyy/MM/dd',
"hoodie.deltastreamer.keygen.timebased.timezone": "GMT",
"hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit": "DAYS",
"hoodie.datasource.write.payload.class": "org.apache.hudi.common.model.PartialUpdateAvroPayload",
"hoodie.compaction.payload.class": "org.apache.hudi.common.model.PartialUpdateAvroPayload",
"hoodie.datasource.compaction.async.enable": "false",
"hoodie.payload.ordering.field": "timestamp"
}
json_schema = StructType([StructField('profile_id', StringType(), True), StructField('timestamp', TimestampType(), True), StructField('id', StringType(), True), StructField('Enjoy', BooleanType(), True), StructField('DOB', ArrayType(StringType(), False), True), StructField('zip', StringType(), True), StructField('country', StringType(), True), StructField('email_vendor', StringType(), True), StructField('city', StringType(), True), StructField('active_audience', DoubleType(), True), StructField('last_name', StringType(), True), StructField('migrated_from', StringType(), True), StructField('product_range', ArrayType(StringType(), False), True), StructField('email_sub', BooleanType(), True), StructField('sms_vendor', StringType(), True), StructField('audience_count', DoubleType(), True), StructField('whatsapp_vendor', StringType(), True), StructField('first_name', StringType(), True)])
### First we try using pyspark.sql.Row API:
df = spark.createDataFrame([Row(profile_id="172597", timestamp=datetime.fromtimestamp(1683010485669/1000), id="4c33de14-986c-444d-8b94-e2c8905c83ca", Enjoy=None, DOB=None, zip="560037", country="India", email_vendor="Mailchimp", city="Bangalore", active_audience=57890.0, last_name="Guy", migrated_from="Moengage", product_range=["Laptop", "Mobile", "Headphones"], email_sub=True, sms_vendor="Serfo", audience_count=80000.0, whatsapp_vendor="Gupshup", first_name="Some")] , json_schema)
df = df.withColumn("timestamp__date_", to_date(df["timestamp"]))
table_name = "row_api_table2"
path = "/tmp/" + table_name
path2 = "/tmp/" + table_name + "_2"
df.write.format("org.apache.hudi") \
.options(**common_config) \
.option('hoodie.table.name', table_name) \
.mode("append") \
.save(path)
spark.read.format("hudi").load(path).show()
newDf = spark.createDataFrame([Row(profile_id="172597", timestamp=datetime.fromtimestamp(1683016101429/1000), id="39cff44b-22c7-41d1-bc66-7d03ff38e4b9", Enjoy=None, DOB=None, zip="560037", country="India", email_vendor=None, city="Bangalore", active_audience=75000.0, last_name="Guy", migrated_from=None, product_range=None, email_sub=True, sms_vendor=None, audience_count=95000.0, whatsapp_vendor=None, first_name="Some")] , json_schema)
newDf = newDf.withColumn("timestamp__date_", to_date(newDf["timestamp"]))
newDf.write.format("org.apache.hudi") \
.options(**common_config) \
.option('hoodie.table.name', table_name) \
.mode("append") \
.save(path)
spark.read.format("hudi").load(path).show()
### Using json string:
json_string1 = """
{
"zip": "560037",
"country": "India",
"email_vendor": "Mailchimp",
"city": "Bangalore",
"active_audience": 57890,
"last_name": "Guy",
"migrated_from": "Moengage",
"product_range": [
"Laptop",
"Mobile",
"Headphones"
],
"email_sub": true,
"profile_id": "172597",
"audience_count": 80000,
"sms_vendor": "Serfo",
"whatsapp_vendor": "Gupshup",
"id": "4c33de14-986c-444d-8b94-e2c8905c83ca",
"first_name": "Some",
"timestamp": 1683010485669
}
"""
json_df = spark.read.json(spark.sparkContext.parallelize([json_string1]), json_schema)
json_df = json_df.withColumn("timestamp__date_", to_date(json_df["timestamp"]))
json_df.write.format("org.apache.hudi") \
.options(**common_config) \
.option('hoodie.table.name', table_name) \
.mode("append") \
.save(path2)
spark.read.format("hudi").load(path2).show()
json_string2 = """
{
"zip": "560037",
"country": "India",
"email_sub": true,
"city": "Bangalore",
"profile_id": "172597",
"audience_count": 95000,
"active_audience": 77800,
"last_name": "Guy",
"id": "39cff44b-22c7-41d1-bc66-7d03ff38e4b9",
"first_name": "Some",
"timestamp": 1683016101429,
"email_vendor": null,
"migrated_from": null,
"product_range": null,
"sms_vendor": null,
"whatsapp_vendor": null
}
"""
json_df1 = spark.read.json(spark.sparkContext.parallelize([json_string2]), json_schema)
json_df1 = json_df1.withColumn("timestamp__date_", to_date(json_df1["timestamp"]))
json_df1.write.format("org.apache.hudi") \
.options(**common_config) \
.option('hoodie.table.name', table_name) \
.mode("append") \
.save(path2)
spark.read.format("hudi").load(path2).show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment