Skip to content

Instantly share code, notes, and snippets.

@ad1happy2go
Created May 23, 2023 16:09
Show Gist options
  • Save ad1happy2go/7d982bc6e137b56ce6e6f18bdb62fd03 to your computer and use it in GitHub Desktop.
Save ad1happy2go/7d982bc6e137b56ce6e6f18bdb62fd03 to your computer and use it in GitHub Desktop.
======GLUE SCRIPT=========
import sys
import os
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.types import *
from datetime import datetime, date
import boto3
from functools import reduce
from pyspark.sql import Row
import uuid
from faker import Faker
spark = (SparkSession.builder.getOrCreate())
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()
# =================================INSERTING DATA =====================================
global faker
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data():
return [
(
uuid.uuid4().__str__(),
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
str(faker.random_int(min=10000, max=150000)),
str(faker.random_int(min=18, max=60)),
str(faker.random_int(min=0, max=100000)),
str(faker.unix_time()),
faker.email(),
faker.credit_card_number(card_type='amex'),
) for x in range(100)
]
data = DataGenerator.get_data()
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card"]
spark_df = spark.createDataFrame(data=data, schema=columns)
# ============================== Settings =======================================
db_name = "hudidb"
table_name = "issue_7191"
recordkey = 'emp_id'
precombine = "ts"
PARTITION_FIELD = 'state'
path = "s3://hudi-github-issues/output/issue_7191/"
method = 'upsert'
table_type = "MERGE_ON_READ"
# ====================================================================================
hudi_part_write_config = {
'className': 'org.apache.hudi',
"hoodie.schema.on.read.enable":"true",
"hoodie.datasource.write.reconcile.schema":"true",
"hoodie.avro.schema.external.transformation":"true",
'hoodie.avro.schema.validate':"true",
"hoodie.datasource.write.schema.allow.auto.evolution.column.drop":"true",
"hoodie.datasource.write.partitionpath.field" : PARTITION_FIELD ,
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.precombine.field': precombine,
'hoodie.datasource.hive_sync.mode': 'hms',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.support_timestamp': 'false',
'hoodie.datasource.hive_sync.database': db_name,
'hoodie.datasource.hive_sync.table': table_name,
}
spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)
# ================================================================
# Adding NEW COLUMN
# ================================================================
class DataGenerator(object):
@staticmethod
def get_data():
return [
(
uuid.uuid4().__str__(),
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
str(faker.random_int(min=10000, max=150000)),
str(faker.random_int(min=18, max=60)),
str(faker.random_int(min=0, max=100000)),
str(faker.unix_time()),
faker.email(),
faker.credit_card_number(card_type='amex'),
faker.date().__str__()
) for x in range(100)
]
data = DataGenerator.get_data()
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card",
"new_date_col"]
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df.withColumn("state",expr("CAST(NULL AS STRING)")).write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment